Skip to content

Commit

Permalink
Support writing tables as CSV.
Browse files Browse the repository at this point in the history
  • Loading branch information
danielballan committed Mar 1, 2024
1 parent a407021 commit 06355c1
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 10 deletions.
37 changes: 37 additions & 0 deletions tiled/_tests/test_writing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from ..catalog import in_memory
from ..client import Context, from_context, record_history
from ..mimetypes import PARQUET_MIMETYPE
from ..queries import Key
from ..server.app import build_app
from ..structures.array import ArrayStructure
Expand Down Expand Up @@ -622,3 +623,39 @@ def test_union_table_column_array_key_collision(tree):
],
key="x",
)


def test_write_with_specified_mimetype(tree):
with Context.from_app(build_app(tree)) as context:
client = from_context(context, include_data_sources=True)
df = pandas.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
structure = TableStructure.from_pandas(df)

for mimetype in [PARQUET_MIMETYPE, "text/csv"]:
x = client.new(
"table",
[
DataSource(
structure_family=StructureFamily.table,
structure=structure,
mimetype=mimetype,
),
],
)
x.write_partition(df, 0)
x.read()
x.refresh()
x.data_sources()[0]["mimetype"] == mimetype

# Specifying unsupported mimetype raises expected error.
with fail_with_status_code(415):
client.new(
"table",
[
DataSource(
structure_family=StructureFamily.table,
structure=structure,
mimetype="application/x-does-not-exist",
),
],
)
75 changes: 75 additions & 0 deletions tiled/adapters/csv.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from pathlib import Path

import dask.dataframe

from ..server.object_cache import NO_CACHE, get_object_cache
from ..structures.core import StructureFamily
from ..utils import path_from_uri
from .dataframe import DataFrameAdapter

Expand Down Expand Up @@ -48,3 +51,75 @@ def read_csv(
"""
+ dask.dataframe.read_csv.__doc__
)


class CSVAdapter:
structure_family = StructureFamily.table

def __init__(
self,
data_uris,
structure,
metadata=None,
specs=None,
access_policy=None,
):
# TODO Store data_uris instead and generalize to non-file schemes.
self._partition_paths = [path_from_uri(uri) for uri in data_uris]
self._metadata = metadata or {}
self._structure = structure
self.specs = list(specs or [])
self.access_policy = access_policy

def metadata(self):
return self._metadata

@property
def dataframe_adapter(self):
partitions = []
for path in self._partition_paths:
if not Path(path).exists():
partition = None
else:
partition = dask.dataframe.read_csv(path)
partitions.append(partition)
return DataFrameAdapter(partitions, self._structure)

@classmethod
def init_storage(cls, data_uri, structure):
from ..server.schemas import Asset

directory = path_from_uri(data_uri)
directory.mkdir(parents=True, exist_ok=True)
assets = [
Asset(
data_uri=f"{data_uri}/partition-{i}.csv",
is_directory=False,
parameter="data_uris",
num=i,
)
for i in range(structure.npartitions)
]
return assets

def write_partition(self, data, partition):
uri = self._partition_paths[partition]
data.to_csv(uri, index=False)

def write(self, data):
if self.structure().npartitions != 1:
raise NotImplementedError
uri = self._partition_paths[0]
data.to_csv(uri, index=False)

def read(self, *args, **kwargs):
return self.dataframe_adapter.read(*args, **kwargs)

def read_partition(self, *args, **kwargs):
return self.dataframe_adapter.read_partition(*args, **kwargs)

def structure(self):
return self._structure

def get(self, key):
return self.dataframe_adapter.get(key)
30 changes: 21 additions & 9 deletions tiled/catalog/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,21 @@
StructureFamily.table: PARQUET_MIMETYPE,
StructureFamily.sparse: SPARSE_BLOCKS_PARQUET_MIMETYPE,
}
DEFAULT_INIT_STORAGE = OneShotCachedMap(
INIT_STORAGE = OneShotCachedMap(
{
StructureFamily.array: lambda: importlib.import_module(
ZARR_MIMETYPE: lambda: importlib.import_module(
"...adapters.zarr", __name__
).ZarrArrayAdapter.init_storage,
StructureFamily.awkward: lambda: importlib.import_module(
AWKWARD_BUFFERS_MIMETYPE: lambda: importlib.import_module(
"...adapters.awkward_buffers", __name__
).AwkwardBuffersAdapter.init_storage,
StructureFamily.table: lambda: importlib.import_module(
PARQUET_MIMETYPE: lambda: importlib.import_module(
"...adapters.parquet", __name__
).ParquetDatasetAdapter.init_storage,
StructureFamily.sparse: lambda: importlib.import_module(
"text/csv": lambda: importlib.import_module(
"...adapters.csv", __name__
).CSVAdapter.init_storage,
SPARSE_BLOCKS_PARQUET_MIMETYPE: lambda: importlib.import_module(
"...adapters.sparse_blocks_parquet", __name__
).SparseBlocksParquetAdapter.init_storage,
}
Expand Down Expand Up @@ -620,17 +623,26 @@ async def create_node(
if data_source.management != Management.external:
if structure_family == StructureFamily.container:
raise NotImplementedError(structure_family)
data_source.mimetype = DEFAULT_CREATION_MIMETYPE[
data_source.structure_family
]
if data_source.mimetype is None:
data_source.mimetype = DEFAULT_CREATION_MIMETYPE[
data_source.structure_family
]
data_source.parameters = {}
data_uri_path_parts = self.segments + [key]
if structure_family == StructureFamily.union:
data_uri_path_parts.append(data_source.name)
data_uri = str(self.context.writable_storage) + "".join(
f"/{quote_plus(segment)}" for segment in data_uri_path_parts
)
init_storage = DEFAULT_INIT_STORAGE[data_source.structure_family]
if data_source.mimetype not in INIT_STORAGE:
raise HTTPException(
status_code=415,
detail=(
f"The given data source mimetype, {data_source.mimetype}, "
"is not one that the Tiled server knows how to write."
),
)
init_storage = INIT_STORAGE[data_source.mimetype]
assets = await ensure_awaitable(
init_storage, data_uri, data_source.structure
)
Expand Down
2 changes: 1 addition & 1 deletion tiled/mimetypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
).TiffSequenceAdapter.from_uris,
"text/csv": lambda: importlib.import_module(
"..adapters.csv", __name__
).read_csv,
).CSVAdapter,
XLSX_MIME_TYPE: lambda: importlib.import_module(
"..adapters.excel", __name__
).ExcelAdapter.from_uri,
Expand Down

0 comments on commit 06355c1

Please sign in to comment.