Skip to content

Commit

Permalink
some more typing added
Browse files Browse the repository at this point in the history
  • Loading branch information
Seher Karakuzu authored and Seher Karakuzu committed Apr 5, 2024
1 parent b89b8cf commit 3c5e3cb
Show file tree
Hide file tree
Showing 14 changed files with 150 additions and 95 deletions.
5 changes: 3 additions & 2 deletions tiled/adapters/awkward.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import awkward
import awkward.forms
from numpy.typing import NDArray
from type_alliases import JSON

from ..access_policies import DummyAccessPolicy, SimpleAccessPolicy
from ..adapters.awkward_buffers import DirectoryContainer
Expand Down Expand Up @@ -68,7 +69,7 @@ def from_array(
access_policy=access_policy,
)

def metadata(self) -> dict[str, str]:
def metadata(self) -> JSON:
"""
Returns
Expand Down Expand Up @@ -100,7 +101,7 @@ def read_buffers(self, form_keys: Optional[list[str]] = None) -> dict[Any, bytes
buffers[key] = self.container[key]
return buffers

def read(self) -> dict[str, bytes]:
def read(self) -> JSON:
return dict(self.container)

def write(self, container: DirectoryContainer) -> None:
Expand Down
12 changes: 8 additions & 4 deletions tiled/adapters/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def read_csv(
data_uri: str,
structure: Optional[TableStructure] = None,
metadata: Optional[JSON] = None,
specs: Optional[List[str]] = None,
specs: Optional[List[Spec]] = None,
access_policy: Optional[Union[DummyAccessPolicy, SimpleAccessPolicy]] = None,
**kwargs: Any,
) -> TableAdapter:
Expand Down Expand Up @@ -121,15 +121,19 @@ def init_storage(cls, data_uri: str, structure: TableStructure) -> Any:
]
return assets

def append_partition(self, data: Any, partition: int) -> None:
def append_partition(
self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame], partition: int
) -> None:
uri = self._partition_paths[partition]
data.to_csv(uri, index=False, mode="a", header=False)

def write_partition(self, data: Any, partition: int) -> None:
def write_partition(
self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame], partition: int
) -> None:
uri = self._partition_paths[partition]
data.to_csv(uri, index=False)

def write(self, data: Any) -> None:
def write(self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame]) -> None:
if self.structure().npartitions != 1:
raise NotImplementedError
uri = self._partition_paths[0]
Expand Down
6 changes: 4 additions & 2 deletions tiled/adapters/excel.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any

import dask.dataframe
import pandas

Expand All @@ -8,7 +10,7 @@

class ExcelAdapter(MapAdapter):
@classmethod
def from_file(cls, file, **kwargs):
def from_file(cls, file: Any, **kwargs: Any) -> "ExcelAdapter":
"""
Read the sheets in an Excel file.
Expand Down Expand Up @@ -52,7 +54,7 @@ def from_file(cls, file, **kwargs):
return cls(mapping, **kwargs)

@classmethod
def from_uri(cls, data_uri, **kwargs):
def from_uri(cls, data_uri: str, **kwargs: Any) -> "ExcelAdapter":
"""
Read the sheets in an Excel file.
Expand Down
12 changes: 6 additions & 6 deletions tiled/adapters/hdf5.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import h5py
import numpy
from numpy._typing import NDArray
from type_alliases import HDF5, Spec
from type_alliases import JSON, Spec

from ..access_policies import DummyAccessPolicy, SimpleAccessPolicy
from ..adapters.utils import IndexersMixin
Expand Down Expand Up @@ -62,7 +62,7 @@ def __init__(
node: Any,
*,
structure: Optional[TableStructure] = None,
metadata: Optional[HDF5] = None,
metadata: Optional[JSON] = None,
specs: Optional[list[Spec]] = None,
access_policy: Optional[Union[SimpleAccessPolicy, DummyAccessPolicy]] = None,
) -> None:
Expand All @@ -78,7 +78,7 @@ def from_file(
file: Any,
*,
structure: Optional[TableStructure] = None,
metadata: HDF5 = None,
metadata: JSON = None,
swmr: bool = SWMR_DEFAULT,
libver: str = "latest",
specs: Optional[List[Spec]] = None,
Expand All @@ -92,7 +92,7 @@ def from_uri(
data_uri: Union[str, list[str]],
*,
structure: Optional[TableStructure] = None,
metadata: Optional[HDF5] = None,
metadata: Optional[JSON] = None,
swmr: bool = SWMR_DEFAULT,
libver: str = "latest",
specs: Optional[list[Spec]] = None,
Expand All @@ -112,7 +112,7 @@ def access_policy(self) -> Optional[Union[SimpleAccessPolicy, DummyAccessPolicy]
def structure(self) -> None:
return None

def metadata(self) -> HDF5:
def metadata(self) -> JSON:
d = dict(self._node.attrs)
for k, v in list(d.items()):
# Convert any bytes to str.
Expand Down Expand Up @@ -197,7 +197,7 @@ def hdf5_lookup(
data_uri: Union[str, list[str]],
*,
structure: Optional[TableStructure] = None,
metadata: Optional[HDF5] = None,
metadata: Optional[JSON] = None,
swmr: bool = SWMR_DEFAULT,
libver: str = "latest",
specs: Optional[List[Spec]] = None,
Expand Down
5 changes: 4 additions & 1 deletion tiled/adapters/netcdf.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from pathlib import Path
from typing import Union

import xarray

from .xarray import DatasetAdapter


def read_netcdf(filepath):
def read_netcdf(filepath: Union[str, list[str], Path]) -> DatasetAdapter:
ds = xarray.open_dataset(filepath, decode_times=False)
return DatasetAdapter.from_dataset(ds)
6 changes: 4 additions & 2 deletions tiled/adapters/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ def init_storage(
]
return assets

def write_partition(self, data: Any, partition: int) -> None:
def write_partition(
self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame], partition: int
) -> None:
uri = self._partition_paths[partition]
data.to_parquet(uri)

def write(self, data: Any) -> None:
def write(self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame]) -> None:
if self.structure().npartitions != 1:
raise NotImplementedError
uri = self._partition_paths[0]
Expand Down
71 changes: 44 additions & 27 deletions tiled/adapters/sparse.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
from typing import Any, Optional, Tuple, Union

import dask
import numpy
import pandas
import sparse
from numpy._typing import NDArray
from type_alliases import JSON, Spec

from ..access_policies import DummyAccessPolicy, SimpleAccessPolicy
from ..structures.core import StructureFamily
from ..structures.sparse import COOStructure
from .array import slice_and_shape_from_block_and_chunks
Expand All @@ -13,14 +20,14 @@ class COOAdapter:
@classmethod
def from_arrays(
cls,
coords,
data,
shape,
dims=None,
metadata=None,
specs=None,
access_policy=None,
):
coords: NDArray[Any],
data: Union[dask.dataframe.DataFrame, pandas.DataFrame],
shape: Tuple[int, ...],
dims: Optional[Tuple[str, ...]] = None,
metadata: Optional[JSON] = None,
specs: Optional[list[Spec]] = None,
access_policy: Optional[Union[SimpleAccessPolicy, DummyAccessPolicy]] = None,
) -> "COOAdapter":
"""
Simplest constructor. Single chunk from coords, data arrays.
"""
Expand All @@ -39,7 +46,15 @@ def from_arrays(
)

@classmethod
def from_coo(cls, coo, *, dims=None, metadata=None, specs=None, access_policy=None):
def from_coo(
cls,
coo: sparse.COO,
*,
dims: Optional[Tuple[str, ...]] = None,
metadata: Optional[JSON] = None,
specs: Optional[list[Spec]] = None,
access_policy: Optional[Union[SimpleAccessPolicy, DummyAccessPolicy]] = None,
) -> "COOAdapter":
"Construct from sparse.COO object."
return cls.from_arrays(
coords=coo.coords,
Expand All @@ -54,15 +69,15 @@ def from_coo(cls, coo, *, dims=None, metadata=None, specs=None, access_policy=No
@classmethod
def from_global_ref(
cls,
blocks,
shape,
chunks,
blocks: dict[Tuple[int, ...], Tuple[NDArray[Any], Any]],
shape: Tuple[int, ...],
chunks: Tuple[Tuple[int, ...], ...],
*,
dims=None,
metadata=None,
specs=None,
access_policy=None,
):
dims: Optional[Tuple[str, ...]] = None,
metadata: Optional[JSON] = None,
specs: Optional[list[Spec]] = None,
access_policy: Optional[Union[SimpleAccessPolicy, DummyAccessPolicy]] = None,
) -> "COOAdapter":
"""
Construct from blocks with coords given in global reference frame.
"""
Expand Down Expand Up @@ -90,13 +105,13 @@ def from_global_ref(

def __init__(
self,
blocks,
structure,
blocks: dict[Tuple[int, ...], Tuple[NDArray[Any], Any]],
structure: COOStructure,
*,
metadata=None,
specs=None,
access_policy=None,
):
metadata: Optional[JSON] = None,
specs: Optional[list[Spec]] = None,
access_policy: Optional[Union[SimpleAccessPolicy, DummyAccessPolicy]] = None,
) -> None:
"""
Construct from blocks with coords given in block-local reference frame.
"""
Expand All @@ -106,21 +121,23 @@ def __init__(
self.specs = specs or []
self.access_policy = access_policy

def metadata(self):
def metadata(self) -> JSON:
return self._metadata

def structure(self):
def structure(self) -> COOStructure:
return self._structure

def read_block(self, block, slice=None):
def read_block(
self, block: Tuple[int, ...], slice: Optional[Union[int, slice]] = None
) -> NDArray[Any]:
coords, data = self.blocks[block]
_, shape = slice_and_shape_from_block_and_chunks(block, self._structure.chunks)
arr = sparse.COO(data=data[:], coords=coords[:], shape=shape)
if slice:
arr = arr[slice]
return arr

def read(self, slice=None):
def read(self, slice: Optional[Union[int, slice]] = None) -> NDArray[Any]:
all_coords = []
all_data = []
for block, (coords, data) in self.blocks.items():
Expand Down
45 changes: 29 additions & 16 deletions tiled/adapters/sparse_blocks_parquet.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import itertools
from typing import Any, Optional, Tuple, Union

import dask.base
import dask.dataframe
import numpy
import pandas
import sparse
from numpy._typing import NDArray

from ..access_policies import DummyAccessPolicy, SimpleAccessPolicy
from ..adapters.array import slice_and_shape_from_block_and_chunks
from ..structures.core import StructureFamily
from ..structures.sparse import COOStructure
from ..utils import path_from_uri
from .type_alliases import JSON, Spec


def load_block(uri):
def load_block(uri: str) -> Tuple[list[int], Tuple[NDArray[Any], Any]]:
# TODO This can be done without pandas.
# Better to use a plain I/O library.
df = pandas.read_parquet(path_from_uri(uri))
Expand All @@ -23,12 +30,12 @@ class SparseBlocksParquetAdapter:

def __init__(
self,
data_uris,
structure,
metadata=None,
specs=None,
access_policy=None,
):
data_uris: Union[str, list[str]],
structure: COOStructure,
metadata: Optional[JSON] = None,
specs: Optional[list[Spec]] = None,
access_policy: Optional[Union[SimpleAccessPolicy, DummyAccessPolicy]] = None,
) -> None:
num_blocks = (range(len(n)) for n in structure.chunks)
self.blocks = {}
for block, uri in zip(itertools.product(*num_blocks), data_uris):
Expand All @@ -41,9 +48,9 @@ def __init__(
@classmethod
def init_storage(
cls,
data_uri,
structure,
):
data_uri: Union[str, list[str]],
structure: COOStructure,
) -> Any:
from ..server.schemas import Asset

directory = path_from_uri(data_uri)
Expand All @@ -61,20 +68,24 @@ def init_storage(
]
return assets

def metadata(self):
def metadata(self) -> JSON:
return self._metadata

def write_block(self, data, block):
def write_block(
self,
data: Union[dask.dataframe.DataFrame, pandas.DataFrame],
block: Tuple[int, ...],
) -> None:
uri = self.blocks[block]
data.to_parquet(path_from_uri(uri))

def write(self, data):
def write(self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame]) -> None:
if len(self.blocks) > 1:
raise NotImplementedError
uri = self.blocks[(0,) * len(self._structure.shape)]
data.to_parquet(path_from_uri(uri))

def read(self, slice=...):
def read(self, slice: Optional[Union[int, slice]]) -> NDArray[Any]:
all_coords = []
all_data = []
for block, uri in self.blocks.items():
Expand All @@ -93,11 +104,13 @@ def read(self, slice=...):
)
return arr[slice]

def read_block(self, block, slice=...):
def read_block(
self, block: Tuple[int, ...], slice: Optional[Union[int, slice]]
) -> NDArray[Any]:
coords, data = load_block(self.blocks[block])
_, shape = slice_and_shape_from_block_and_chunks(block, self._structure.chunks)
arr = sparse.COO(data=data[:], coords=coords[:], shape=shape)
return arr[slice]

def structure(self):
def structure(self) -> COOStructure:
return self._structure
Loading

0 comments on commit 3c5e3cb

Please sign in to comment.