Skip to content

Commit

Permalink
Refactor init_storage interface for SQL.
Browse files Browse the repository at this point in the history
  • Loading branch information
danielballan committed Oct 3, 2024
1 parent 7679190 commit 92097a8
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 66 deletions.
20 changes: 17 additions & 3 deletions tiled/adapters/awkward_buffers.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""
A directory containing awkward buffers, one file per form key.
"""

from pathlib import Path
from typing import List, Optional
from urllib.parse import quote_plus

import awkward.forms

from ..server.schemas import Asset
from ..server.schemas import Asset, DataSource, Storage
from ..structures.awkward import AwkwardStructure
from ..structures.core import Spec, StructureFamily
from ..utils import path_from_uri
Expand All @@ -22,7 +24,12 @@ class AwkwardBuffersAdapter(AwkwardAdapter):
structure_family = StructureFamily.awkward

@classmethod
def init_storage(cls, data_uri: str, structure: AwkwardStructure) -> List[Asset]:
def init_storage(
cls,
storage: Storage,
data_source: DataSource[AwkwardStructure],
path_parts: List[str],
) -> DataSource[AwkwardStructure]:
"""
Parameters
Expand All @@ -34,9 +41,16 @@ def init_storage(cls, data_uri: str, structure: AwkwardStructure) -> List[Asset]
-------
"""
data_source = data_source.copy() # Do not mutate caller input.
data_uri = str(storage.filesystem) + "".join(
f"/{quote_plus(segment)}" for segment in path_parts
)
directory: Path = path_from_uri(data_uri)
directory.mkdir(parents=True, exist_ok=True)
return [Asset(data_uri=data_uri, is_directory=True, parameter="data_uri")]
data_source.assets.append(
Asset(data_uri=data_uri, is_directory=True, parameter="data_uri")
)
return data_source

@classmethod
def from_directory(
Expand Down
19 changes: 13 additions & 6 deletions tiled/adapters/sparse_blocks_parquet.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import itertools
from typing import Any, List, Optional, Tuple, Union
from urllib.parse import quote_plus

import dask.base
import dask.dataframe
Expand All @@ -9,7 +10,7 @@
from numpy._typing import NDArray

from ..adapters.array import slice_and_shape_from_block_and_chunks
from ..server.schemas import Asset
from ..server.schemas import Asset, DataSource, Storage
from ..structures.core import Spec, StructureFamily
from ..structures.sparse import COOStructure
from ..utils import path_from_uri
Expand Down Expand Up @@ -71,9 +72,10 @@ def __init__(
@classmethod
def init_storage(
cls,
data_uri: str,
structure: COOStructure,
) -> List[Asset]:
storage: Storage,
data_source: DataSource[COOStructure],
path_parts: List[str],
) -> DataSource[COOStructure]:
"""
Parameters
Expand All @@ -85,10 +87,14 @@ def init_storage(
-------
"""
data_source = data_source.copy() # Do not mutate caller input.
data_uri = str(storage.filesystem) + "".join(
f"/{quote_plus(segment)}" for segment in path_parts
)
directory = path_from_uri(data_uri)
directory.mkdir(parents=True, exist_ok=True)

num_blocks = (range(len(n)) for n in structure.chunks)
num_blocks = (range(len(n)) for n in data_source.structure.chunks)
assets = [
Asset(
data_uri=f"{data_uri}/block-{'.'.join(map(str, block))}.parquet",
Expand All @@ -98,7 +104,8 @@ def init_storage(
)
for i, block in enumerate(itertools.product(*num_blocks))
]
return assets
data_source.assets.extend(assets)
return data_source

def metadata(self) -> JSON:
"""
Expand Down
29 changes: 2 additions & 27 deletions tiled/adapters/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def __init__(
self,
data_uri: str,
structure: TableStructure,
table_name: str,
dataset_id: int,
metadata: Optional[JSON] = None,
specs: Optional[List[Spec]] = None,
access_policy: Optional[AccessPolicy] = None,
Expand Down Expand Up @@ -113,33 +115,6 @@ def get(self, key: str) -> Union[ArrayAdapter, None]:
return None
return ArrayAdapter.from_array(self.read([key])[key].values)

@classmethod
def from_single_file(
cls,
data_uri: str,
structure: TableStructure,
metadata: Optional[JSON] = None,
specs: Optional[List[Spec]] = None,
access_policy: Optional[AccessPolicy] = None,
) -> "SQLAdapter":
"""
Construct the SQLAdapter object from `from_single_file` classmethod.
Parameters
----------
data_uri : the uri of the database, starting either with "sqlite://" or "postgresql://"
structure : the structure of the data. structure is not optional for sql database
metadata : the optional metadata of the data.
specs : the specs.
access_policy : the access policy of the data.
"""
return cls(
data_uri,
structure=structure,
metadata=metadata,
specs=specs,
access_policy=access_policy,
)

def __getitem__(self, key: str) -> ArrayAdapter:
"""
Get the data for a specific key.
Expand Down
25 changes: 18 additions & 7 deletions tiled/adapters/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import sys
from typing import Any, Iterator, List, Optional, Tuple, Union
from urllib.parse import quote_plus

import zarr.core
import zarr.hierarchy
Expand All @@ -11,7 +12,7 @@

from ..adapters.utils import IndexersMixin
from ..iterviews import ItemsView, KeysView, ValuesView
from ..server.schemas import Asset
from ..server.schemas import Asset, DataSource, Storage
from ..structures.array import ArrayStructure
from ..structures.core import Spec, StructureFamily
from ..utils import node_repr, path_from_uri
Expand Down Expand Up @@ -56,7 +57,12 @@ class ZarrArrayAdapter(ArrayAdapter):
""" """

@classmethod
def init_storage(cls, data_uri: str, structure: ArrayStructure) -> List[Asset]:
def init_storage(
cls,
storage: Storage,
data_source: DataSource[ArrayStructure],
path_parts: List[str],
) -> DataSource[ArrayStructure]:
"""
Parameters
Expand All @@ -68,10 +74,14 @@ def init_storage(cls, data_uri: str, structure: ArrayStructure) -> List[Asset]:
-------
"""
data_source = data_source.copy() # Do not mutate caller input.
data_uri = str(storage.filesystem) + "".join(
f"/{quote_plus(segment)}" for segment in path_parts
)
# Zarr requires evenly-sized chunks within each dimension.
# Use the first chunk along each dimension.
zarr_chunks = tuple(dim[0] for dim in structure.chunks)
shape = tuple(dim[0] * len(dim) for dim in structure.chunks)
zarr_chunks = tuple(dim[0] for dim in data_source.structure.chunks)
shape = tuple(dim[0] * len(dim) for dim in data_source.structure.chunks)
directory = path_from_uri(data_uri)
directory.mkdir(parents=True, exist_ok=True)
storage = zarr.storage.DirectoryStore(str(directory))
Expand All @@ -80,15 +90,16 @@ def init_storage(cls, data_uri: str, structure: ArrayStructure) -> List[Asset]:
storage,
shape=shape,
chunks=zarr_chunks,
dtype=structure.data_type.to_numpy_dtype(),
dtype=data_source.structure.data_type.to_numpy_dtype(),
)
return [
data_source.assets.append(
Asset(
data_uri=data_uri,
is_directory=True,
parameter="data_uri",
)
]
)
return data_source

def _stencil(self) -> Tuple[slice, ...]:
"""
Expand Down
19 changes: 9 additions & 10 deletions tiled/catalog/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from functools import partial, reduce
from pathlib import Path
from typing import Callable, Dict
from urllib.parse import quote_plus, urlparse
from urllib.parse import urlparse

import anyio
from fastapi import HTTPException
Expand Down Expand Up @@ -61,7 +61,7 @@
ZARR_MIMETYPE,
)
from ..query_registration import QueryTranslationRegistry
from ..server.schemas import Asset, DataSource, Management, Revision, Spec
from ..server.schemas import Asset, DataSource, Management, Revision, Spec, Storage
from ..structures.core import StructureFamily
from ..utils import (
UNCHANGED,
Expand Down Expand Up @@ -159,7 +159,7 @@ def __init__(
)
# If it is writable, it is automatically also readable.
readable_storage.append(writable_storage)
self.writable_storage = writable_storage
self.writable_storage = Storage(filesystem=writable_storage, sql=None)
self.readable_storage = [ensure_uri(path) for path in readable_storage]
self.key_maker = key_maker
adapters_by_mimetype = adapters_by_mimetype or {}
Expand Down Expand Up @@ -327,7 +327,7 @@ async def shutdown(self):

@property
def writable(self):
return bool(self.context.writable_storage)
return any(self.context.writable_storage.values())

def __repr__(self):
return f"<{type(self).__name__} /{'/'.join(self.segments)}>"
Expand Down Expand Up @@ -643,9 +643,6 @@ async def create_node(
data_source.structure_family
]
data_source.parameters = {}
data_uri = str(self.context.writable_storage) + "".join(
f"/{quote_plus(segment)}" for segment in (self.segments + [key])
)
if data_source.mimetype not in INIT_STORAGE:
raise HTTPException(
status_code=415,
Expand All @@ -655,10 +652,12 @@ async def create_node(
),
)
init_storage = INIT_STORAGE[data_source.mimetype]
assets = await ensure_awaitable(
init_storage, data_uri, data_source.structure
data_source = await ensure_awaitable(
init_storage,
self.context.writable_storage,
data_source,
self.segments + [key],
)
data_source.assets.extend(assets)
else:
if data_source.mimetype not in self.context.adapters_by_mimetype:
raise HTTPException(
Expand Down
24 changes: 11 additions & 13 deletions tiled/server/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
DataT = TypeVar("DataT")
LinksT = TypeVar("LinksT")
MetaT = TypeVar("MetaT")
StructureT = TypeVar("StructureT")

MAX_ALLOWED_SPECS = 20

Expand Down Expand Up @@ -138,19 +139,11 @@ def from_orm(cls, orm: tiled.catalog.orm.Revision) -> Revision:
)


class DataSource(pydantic.BaseModel):
id: Optional[int] = None
structure_family: Optional[StructureFamily] = None
structure: Optional[
Union[
ArrayStructure,
AwkwardStructure,
SparseStructure,
NodeStructure,
TableStructure,
]
] = None
mimetype: Optional[str] = None
class DataSource(pydantic.BaseModel, Generic[StructureT]):
id: int
structure_family: StructureFamily
structure: StructureT
mimetype: str
parameters: dict = {}
assets: List[Asset] = []
management: Management = Management.writable
Expand Down Expand Up @@ -567,4 +560,9 @@ class PatchMetadataResponse(pydantic.BaseModel, Generic[ResourceLinksT]):
data_sources: Optional[List[DataSource]]


class Storage(pydantic.BaseModel):
filesystem: Optional[str]
sql: Optional[str]


NodeStructure.model_rebuild()

0 comments on commit 92097a8

Please sign in to comment.