From 06355c145b49cb08567345e1d24cf227fd90fd92 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Fri, 1 Mar 2024 10:40:11 -0500 Subject: [PATCH] Support writing tables as CSV. --- tiled/_tests/test_writing.py | 37 ++++++++++++++++++ tiled/adapters/csv.py | 75 ++++++++++++++++++++++++++++++++++++ tiled/catalog/adapter.py | 30 ++++++++++----- tiled/mimetypes.py | 2 +- 4 files changed, 134 insertions(+), 10 deletions(-) diff --git a/tiled/_tests/test_writing.py b/tiled/_tests/test_writing.py index e499a1282..10993be28 100644 --- a/tiled/_tests/test_writing.py +++ b/tiled/_tests/test_writing.py @@ -17,6 +17,7 @@ from ..catalog import in_memory from ..client import Context, from_context, record_history +from ..mimetypes import PARQUET_MIMETYPE from ..queries import Key from ..server.app import build_app from ..structures.array import ArrayStructure @@ -622,3 +623,39 @@ def test_union_table_column_array_key_collision(tree): ], key="x", ) + + +def test_write_with_specified_mimetype(tree): + with Context.from_app(build_app(tree)) as context: + client = from_context(context, include_data_sources=True) + df = pandas.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}) + structure = TableStructure.from_pandas(df) + + for mimetype in [PARQUET_MIMETYPE, "text/csv"]: + x = client.new( + "table", + [ + DataSource( + structure_family=StructureFamily.table, + structure=structure, + mimetype=mimetype, + ), + ], + ) + x.write_partition(df, 0) + x.read() + x.refresh() + x.data_sources()[0]["mimetype"] == mimetype + + # Specifying unsupported mimetype raises expected error. + with fail_with_status_code(415): + client.new( + "table", + [ + DataSource( + structure_family=StructureFamily.table, + structure=structure, + mimetype="application/x-does-not-exist", + ), + ], + ) diff --git a/tiled/adapters/csv.py b/tiled/adapters/csv.py index eda87889a..6c66bed56 100644 --- a/tiled/adapters/csv.py +++ b/tiled/adapters/csv.py @@ -1,6 +1,9 @@ +from pathlib import Path + import dask.dataframe from ..server.object_cache import NO_CACHE, get_object_cache +from ..structures.core import StructureFamily from ..utils import path_from_uri from .dataframe import DataFrameAdapter @@ -48,3 +51,75 @@ def read_csv( """ + dask.dataframe.read_csv.__doc__ ) + + +class CSVAdapter: + structure_family = StructureFamily.table + + def __init__( + self, + data_uris, + structure, + metadata=None, + specs=None, + access_policy=None, + ): + # TODO Store data_uris instead and generalize to non-file schemes. + self._partition_paths = [path_from_uri(uri) for uri in data_uris] + self._metadata = metadata or {} + self._structure = structure + self.specs = list(specs or []) + self.access_policy = access_policy + + def metadata(self): + return self._metadata + + @property + def dataframe_adapter(self): + partitions = [] + for path in self._partition_paths: + if not Path(path).exists(): + partition = None + else: + partition = dask.dataframe.read_csv(path) + partitions.append(partition) + return DataFrameAdapter(partitions, self._structure) + + @classmethod + def init_storage(cls, data_uri, structure): + from ..server.schemas import Asset + + directory = path_from_uri(data_uri) + directory.mkdir(parents=True, exist_ok=True) + assets = [ + Asset( + data_uri=f"{data_uri}/partition-{i}.csv", + is_directory=False, + parameter="data_uris", + num=i, + ) + for i in range(structure.npartitions) + ] + return assets + + def write_partition(self, data, partition): + uri = self._partition_paths[partition] + data.to_csv(uri, index=False) + + def write(self, data): + if self.structure().npartitions != 1: + raise NotImplementedError + uri = self._partition_paths[0] + data.to_csv(uri, index=False) + + def read(self, *args, **kwargs): + return self.dataframe_adapter.read(*args, **kwargs) + + def read_partition(self, *args, **kwargs): + return self.dataframe_adapter.read_partition(*args, **kwargs) + + def structure(self): + return self._structure + + def get(self, key): + return self.dataframe_adapter.get(key) diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index 27dd255ed..040f43192 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -74,18 +74,21 @@ StructureFamily.table: PARQUET_MIMETYPE, StructureFamily.sparse: SPARSE_BLOCKS_PARQUET_MIMETYPE, } -DEFAULT_INIT_STORAGE = OneShotCachedMap( +INIT_STORAGE = OneShotCachedMap( { - StructureFamily.array: lambda: importlib.import_module( + ZARR_MIMETYPE: lambda: importlib.import_module( "...adapters.zarr", __name__ ).ZarrArrayAdapter.init_storage, - StructureFamily.awkward: lambda: importlib.import_module( + AWKWARD_BUFFERS_MIMETYPE: lambda: importlib.import_module( "...adapters.awkward_buffers", __name__ ).AwkwardBuffersAdapter.init_storage, - StructureFamily.table: lambda: importlib.import_module( + PARQUET_MIMETYPE: lambda: importlib.import_module( "...adapters.parquet", __name__ ).ParquetDatasetAdapter.init_storage, - StructureFamily.sparse: lambda: importlib.import_module( + "text/csv": lambda: importlib.import_module( + "...adapters.csv", __name__ + ).CSVAdapter.init_storage, + SPARSE_BLOCKS_PARQUET_MIMETYPE: lambda: importlib.import_module( "...adapters.sparse_blocks_parquet", __name__ ).SparseBlocksParquetAdapter.init_storage, } @@ -620,9 +623,10 @@ async def create_node( if data_source.management != Management.external: if structure_family == StructureFamily.container: raise NotImplementedError(structure_family) - data_source.mimetype = DEFAULT_CREATION_MIMETYPE[ - data_source.structure_family - ] + if data_source.mimetype is None: + data_source.mimetype = DEFAULT_CREATION_MIMETYPE[ + data_source.structure_family + ] data_source.parameters = {} data_uri_path_parts = self.segments + [key] if structure_family == StructureFamily.union: @@ -630,7 +634,15 @@ async def create_node( data_uri = str(self.context.writable_storage) + "".join( f"/{quote_plus(segment)}" for segment in data_uri_path_parts ) - init_storage = DEFAULT_INIT_STORAGE[data_source.structure_family] + if data_source.mimetype not in INIT_STORAGE: + raise HTTPException( + status_code=415, + detail=( + f"The given data source mimetype, {data_source.mimetype}, " + "is not one that the Tiled server knows how to write." + ), + ) + init_storage = INIT_STORAGE[data_source.mimetype] assets = await ensure_awaitable( init_storage, data_uri, data_source.structure ) diff --git a/tiled/mimetypes.py b/tiled/mimetypes.py index ce67040a4..22bb4602b 100644 --- a/tiled/mimetypes.py +++ b/tiled/mimetypes.py @@ -20,7 +20,7 @@ ).TiffSequenceAdapter.from_uris, "text/csv": lambda: importlib.import_module( "..adapters.csv", __name__ - ).read_csv, + ).CSVAdapter, XLSX_MIME_TYPE: lambda: importlib.import_module( "..adapters.excel", __name__ ).ExcelAdapter.from_uri,