Skip to content

Commit

Permalink
Implement single-key access.
Browse files Browse the repository at this point in the history
  • Loading branch information
danielballan committed Feb 26, 2024
1 parent 23d27da commit ef033e3
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 5 deletions.
7 changes: 7 additions & 0 deletions tiled/_tests/test_writing.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,15 +582,22 @@ def test_union_two_tables_two_arrays(tree):
],
key="x",
)
# Write by data source.
x.contents["table1"].write(df1)
x.contents["table2"].write(df2)
x.contents["F"].write_block(arr1, (0, 0))
x.contents["G"].write_block(arr2, (0, 0))

# Read by data source.
x.contents["table1"].read()
x.contents["table2"].read()
x.contents["F"].read()
x.contents["G"].read()

# Read by column.
for column in ["A", "B", "C", "D", "E", "F", "G"]:
x[column].read()


def test_union_table_column_array_key_collision(tree):
with Context.from_app(build_app(tree)) as context:
Expand Down
3 changes: 3 additions & 0 deletions tiled/adapters/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,6 @@ def read_partition(self, *args, **kwargs):

def structure(self):
return self._structure

def get(self, key):
return self.dataframe_adapter.get(key)
5 changes: 5 additions & 0 deletions tiled/adapters/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ def __getitem__(self, key):
# Must compute to determine shape.
return ArrayAdapter.from_array(self.read([key])[key].values)

def get(self, key):
if key not in self.structure().columns:
return None
return ArrayAdapter.from_array(self.read([key])[key].values)

def items(self):
yield from (
(key, ArrayAdapter.from_array(self.read([key])[key].values))
Expand Down
20 changes: 18 additions & 2 deletions tiled/catalog/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,10 @@ async def lookup_adapter(

for i in range(len(segments)):
catalog_adapter = await self.lookup_adapter(segments[:i])
if (catalog_adapter.structure_family == StructureFamily.union) and len(
segments[i:]
) == 1:
return await ensure_awaitable(catalog_adapter.get, segments[-1])
if catalog_adapter.data_sources:
adapter = await catalog_adapter.get_adapter()
for segment in segments[i:]:
Expand Down Expand Up @@ -969,6 +973,9 @@ class CatalogSparseAdapter(CatalogArrayAdapter):


class CatalogTableAdapter(CatalogNodeAdapter):
async def get(self, *args, **kwargs):
return (await self.get_adapter()).get(*args, **kwargs)

async def read(self, *args, **kwargs):
return await ensure_awaitable((await self.get_adapter()).read, *args, **kwargs)

Expand All @@ -987,8 +994,17 @@ async def write_partition(self, *args, **kwargs):


class CatalogUnionAdapter(CatalogNodeAdapter):
def get(self, key):
...
async def get(self, key):
if key not in self.structure().all_keys:
return None
for data_source in self.data_sources:
if data_source.structure_family == StructureFamily.table:
if key in data_source.structure.columns:
return await ensure_awaitable(
self.for_data_source(data_source.name).get, key
)
if key == data_source.name:
return self.for_data_source(data_source.name)

def for_data_source(self, data_source_name):
for data_source in self.data_sources:
Expand Down
28 changes: 26 additions & 2 deletions tiled/client/union.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import copy

from .base import STRUCTURE_TYPES, BaseClient
from .utils import client_for_item
from .utils import MSGPACK_MIME_TYPE, ClientError, client_for_item, handle_error


class UnionClient(BaseClient):
Expand All @@ -19,7 +19,31 @@ def contents(self):
def __getitem__(self, key):
if key not in self.structure().all_keys:
raise KeyError(key)
raise NotImplementedError
try:
self_link = self.item["links"]["self"]
if self_link.endswith("/"):
self_link = self_link[:-1]
params = {}
if self._include_data_sources:
params["include_data_sources"] = True
content = handle_error(
self.context.http_client.get(
f"{self_link}/{key}",
headers={"Accept": MSGPACK_MIME_TYPE},
params=params,
)
).json()
except ClientError as err:
if err.response.status_code == 404:
raise KeyError(key)
raise
item = content["data"]
return client_for_item(
self.context,
self.structure_clients,
item,
include_data_sources=self._include_data_sources,
)


class UnionContents:
Expand Down
2 changes: 1 addition & 1 deletion tiled/serialization/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def serialize_csv(df, metadata, preserve_index=False):
def deserialize_csv(buffer):
import pandas

return pandas.read_csv(io.BytesIO(buffer))
return pandas.read_csv(io.BytesIO(buffer), headers=False)


serialization_registry.register(StructureFamily.table, "text/csv", serialize_csv)
Expand Down

0 comments on commit ef033e3

Please sign in to comment.