Skip to content

Commit

Permalink
some more refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Seher Karakuzu authored and Seher Karakuzu committed Jul 25, 2024
1 parent 5e9e93e commit e584c95
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 80 deletions.
12 changes: 8 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ Write the date in place of the "Unreleased" in the case a new version is release

## Unreleased

### Added

- Support for `FullText` search on SQLite-backed catalogs

### Fixed
- A bug in `Context.__getstate__` caused picking to fail if applied twice.

Expand Down Expand Up @@ -34,6 +38,10 @@ Write the date in place of the "Unreleased" in the case a new version is release
potential code injection attacks due to current or future bugs in Tiled or
its upstream dependencies.

### Added

- Added an `Arrow` adapter which supports reading/writing arrow tables via `RecordBatchFileReader`/`RecordBatchFileWriter` .

## v0.1.0b3 (2024-06-04)

### Added
Expand Down Expand Up @@ -62,10 +70,6 @@ Write the date in place of the "Unreleased" in the case a new version is release

## v0.1.0b1 (2024-05-25)

### Added

- Support for `FullText` search on SQLite-backed catalogs

### Fixed

- Updated `BaseClient.formats` to use the `dict` structure for specs.
Expand Down
94 changes: 23 additions & 71 deletions tiled/adapters/arrow.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from pathlib import Path
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union

import dask.dataframe
import pandas
import pyarrow
import pyarrow.feather as feather
import pyarrow.fs
Expand All @@ -16,54 +14,6 @@
from .type_alliases import JSON


class ReaderHandle:
"""Class to provide handle to read the data via ArrowAdapter."""

def __init__(
self,
partitions: Union[dask.dataframe.DataFrame, pandas.DataFrame],
) -> None:
"""
Class to create a new instance of read_all function.
Parameters
----------
partitions : the partitions
"""
self._partitions = list(partitions)

def read(self) -> pyarrow.table:
"""
The concatenated data from given set of partitions as pyarrow table.
Parameters
----------
Returns
-------
Returns the concatenated pyarrow table.
"""
print("Ever in adapters/table read????", len(self._partitions))
if any(p is None for p in self._partitions):
raise ValueError("Not all partitions have been stored.")

return pyarrow.concat_tables(
[partition.read_all() for partition in self._partitions]
)

def read_partition_with_batch(self, partition: int, batch: int) -> pyarrow.table:
"""
Function to read a batch of data from a given parititon.
Parameters
----------
partition : the index of the partition to read.
batch : the index of the batch to read.
Returns
-------
The pyarrow table corresponding to a given partition and batch.
"""
df = self._partitions[partition]
return df.get_batch(batch)


class ArrowAdapter:
""" """

Expand Down Expand Up @@ -255,7 +205,7 @@ def items(self) -> Iterator[Tuple[str, ArrayAdapter]]:
)

@property
def reader_handle(self) -> ReaderHandle:
def reader_handle(self) -> List[pyarrow.RecordBatchFileReader]:
"""
Function to initialize and return the reader hanle.
Returns
Expand All @@ -272,7 +222,7 @@ def reader_handle(self) -> ReaderHandle:
# with pyarrow.ipc.open_stream(path) as reader:
partition = reader
partitions.append(partition)
return ReaderHandle(partitions)
return partitions

def write_partition(
self,
Expand All @@ -290,10 +240,9 @@ def write_partition(
-------
"""
if isinstance(data, list):
schema = data[0].schema
else:
schema = data.schema
if not isinstance(data, list):
data = list(data)
schema = data[0].schema

uri = self._partition_paths[partition]

Expand All @@ -315,10 +264,9 @@ def write(
-------
"""
if isinstance(data, list):
schema = data[0].schema
else:
schema = data.schema
if not isinstance(data, list):
data = list(data)
schema = data[0].schema

if self.structure().npartitions != 1:
raise NotImplementedError
Expand All @@ -331,28 +279,32 @@ def write(

def read(self, *args: Any, **kwargs: Any) -> pyarrow.table:
"""
Function to read all the partitions of the data.
The concatenated data from given set of partitions as pyarrow table.
Parameters
----------
args : any extra arguments to be unpacked into the function.
kwargs : any extra keyword arguments to be unpacked into the function.
Returns
-------
The whole content of the file as pyarrow table.
Returns the concatenated pyarrow table.
"""
return self.reader_handle.read(*args, **kwargs)

def read_partition(self, *args: Any, **kwargs: Any) -> pyarrow.table:
if any(p is None for p in self.reader_handle):
raise ValueError("Not all partitions have been stored.")

return pyarrow.concat_tables(
[partition.read_all() for partition in self.reader_handle]
)

def read_partition(self, partition: int, batch: int) -> pyarrow.table:
"""
Function to read a batch of data from a given parititon.
Parameters
----------
args : any extra arguments to be unpacked into the function.
kwargs : any extra keyword arguments to be unpacked into the function.
partition : the index of the partition to read.
batch : the index of the batch to read.
Returns
-------
The pyarrow table corresponding to given partition and batch.
The pyarrow table corresponding to a given partition and batch.
"""
return self.reader_handle.read_partition_with_batch(*args, **kwargs)
df = self.reader_handle[partition]
return df.get_batch(batch)
2 changes: 0 additions & 2 deletions tiled/adapters/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def from_pandas(
-------
"""
print("HHHHHHHHere in table adapter")
ddf = dask.dataframe.from_pandas(*args, npartitions=npartitions, **kwargs)
if specs is None:
specs = [Spec("dataframe")]
Expand Down Expand Up @@ -181,7 +180,6 @@ def read(
-------
"""
print("Ever in adapters/table read????", len(self._partitions))
if any(p is None for p in self._partitions):
raise ValueError("Not all partitions have been stored.")
if isinstance(self._partitions[0], dask.dataframe.DataFrame):
Expand Down
3 changes: 0 additions & 3 deletions tiled/structures/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def from_dask_dataframe(cls, ddf) -> "TableStructure":

@classmethod
def from_pandas(cls, df):
print("HEREEEEEE in table structure")
import pyarrow

schema_bytes = pyarrow.Table.from_pandas(df).schema.serialize()
Expand All @@ -50,7 +49,6 @@ def from_pandas(cls, df):

@classmethod
def from_arrays(cls, arr, names):
print("HEREEEEEE in table structure arrowy")
import pyarrow

schema_bytes = pyarrow.Table.from_arrays(arr, names).schema.serialize()
Expand All @@ -60,7 +58,6 @@ def from_arrays(cls, arr, names):

@classmethod
def from_arrow_table(cls, tble) -> "TableStructure":
print("HEREEEEEE in table structure arrowy")
schema_bytes = tble.schema.serialize()
schema_b64 = base64.b64encode(schema_bytes).decode("utf-8")
data_uri = B64_ENCODED_PREFIX + schema_b64
Expand Down

0 comments on commit e584c95

Please sign in to comment.