Skip to content

Commit

Permalink
[CHORE] Remove user-facing arguments for casting to Ray's tensor type (
Browse files Browse the repository at this point in the history
…#2802)

## Summary

Cleanup PR.

1. Removes `cast_tensors_to_ray_tensor_dtype` as a user-facing argument
in our export methods (e.g. `to_arrow`, `to_pandas` etc) -- this is
really only intended to be used when a user is converting a Daft
dataframe to a Ray dataset anyways and there isn't a need to expose this
functionality to a user
2. Instead, the logic for casting `daft.DataType.tensor` data to a Ray
Data tensor type is done inside of the conversion code for Ray Data
(`_make_ray_block_from_micropartition`). This lets us contain the
ickiness of that code without having it touch all of our `to_arrow`
logic
3. Also removes `_trim_pyarrow_large_arrays` which was a legacy codepath
that doesn't get hit anymore

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia committed Sep 7, 2024
1 parent e69b089 commit 3c2af5a
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 172 deletions.
16 changes: 9 additions & 7 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2462,14 +2462,11 @@ def __contains__(self, col_name: str) -> bool:
return col_name in self.column_names

@DataframePublicAPI
def to_pandas(
self, cast_tensors_to_ray_tensor_dtype: bool = False, coerce_temporal_nanoseconds: bool = False
) -> "pandas.DataFrame":
def to_pandas(self, coerce_temporal_nanoseconds: bool = False) -> "pandas.DataFrame":
"""Converts the current DataFrame to a `pandas DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__.
If results have not computed yet, collect will be called.
Args:
cast_tensors_to_ray_tensor_dtype (bool): Whether to cast tensors to Ray tensor dtype. Defaults to False.
coerce_temporal_nanoseconds (bool): Whether to coerce temporal columns to nanoseconds. Only applicable to pandas version >= 2.0 and pyarrow version >= 13.0.0. Defaults to False. See `pyarrow.Table.to_pandas <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pandas>`__ for more information.
Returns:
Expand All @@ -2484,13 +2481,12 @@ def to_pandas(

pd_df = result.to_pandas(
schema=self._builder.schema(),
cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype,
coerce_temporal_nanoseconds=coerce_temporal_nanoseconds,
)
return pd_df

@DataframePublicAPI
def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pyarrow.Table":
def to_arrow(self) -> "pyarrow.Table":
"""Converts the current DataFrame to a `pyarrow Table <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html>`__.
If results have not computed yet, collect will be called.
Expand All @@ -2500,11 +2496,17 @@ def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pyarrow.T
.. NOTE::
This call is **blocking** and will execute the DataFrame when called
"""
for name in self.schema().column_names():
if self.schema()[name].dtype._is_python_type():
raise ValueError(
f"Cannot convert column {name} to Arrow type, found Python type: {self.schema()[name].dtype}"
)

self.collect()
result = self._result
assert result is not None

return result.to_arrow(cast_tensors_to_ray_tensor_dtype)
return result.to_arrow()

@DataframePublicAPI
def to_pydict(self) -> Dict[str, List[Any]]:
Expand Down
4 changes: 2 additions & 2 deletions daft/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,8 @@ def from_numpy_dtype(cls, np_type: np.dtype) -> DataType:
arrow_type = pa.from_numpy_dtype(np_type)
return cls.from_arrow_type(arrow_type)

def to_arrow_dtype(self, cast_tensor_to_ray_type: builtins.bool = False) -> pa.DataType:
return self._dtype.to_arrow(cast_tensor_to_ray_type)
def to_arrow_dtype(self) -> pa.DataType:
return self._dtype.to_arrow()

@classmethod
def python(cls) -> DataType:
Expand Down
6 changes: 2 additions & 4 deletions daft/runners/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,19 +216,17 @@ def to_pydict(self) -> dict[str, list[Any]]:
def to_pandas(
self,
schema: Schema | None = None,
cast_tensors_to_ray_tensor_dtype: bool = False,
coerce_temporal_nanoseconds: bool = False,
) -> pd.DataFrame:
merged_partition = self._get_merged_micropartition()
return merged_partition.to_pandas(
schema=schema,
cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype,
coerce_temporal_nanoseconds=coerce_temporal_nanoseconds,
)

def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> pa.Table:
def to_arrow(self) -> pa.Table:
merged_partition = self._get_merged_micropartition()
return merged_partition.to_arrow(cast_tensors_to_ray_tensor_dtype)
return merged_partition.to_arrow()

def items(self) -> list[tuple[PartID, MaterializedResult[PartitionT]]]:
"""
Expand Down
37 changes: 36 additions & 1 deletion daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@

RAY_VERSION = tuple(int(s) for s in ray.__version__.split(".")[0:3])

_RAY_DATA_ARROW_TENSOR_TYPE_AVAILABLE = True
try:
from ray.data.extensions import ArrowTensorArray, ArrowTensorType
except ImportError:
_RAY_DATA_ARROW_TENSOR_TYPE_AVAILABLE = False


@ray.remote
def _glob_path_into_file_infos(
Expand All @@ -95,7 +101,36 @@ def _glob_path_into_file_infos(
@ray.remote
def _make_ray_block_from_micropartition(partition: MicroPartition) -> RayDatasetBlock:
try:
return partition.to_arrow(cast_tensors_to_ray_tensor_dtype=True)
daft_schema = partition.schema()
arrow_tbl = partition.to_arrow()

# Convert arrays to Ray Data's native ArrowTensorType arrays
new_arrs = {}
for idx, field in enumerate(arrow_tbl.schema):
if daft_schema[field.name].dtype._is_fixed_shape_tensor_type():
assert isinstance(field.type, pa.FixedShapeTensorType)
new_dtype = ArrowTensorType(field.type.shape, field.type.value_type)
arrow_arr = arrow_tbl[field.name].combine_chunks()
storage_arr = arrow_arr.storage
list_size = storage_arr.type.list_size
new_storage_arr = pa.ListArray.from_arrays(
pa.array(
list(range(0, (len(arrow_arr) + 1) * list_size, list_size)),
pa.int32(),
),
storage_arr.values,
)
new_arrs[idx] = (
field.name,
pa.ExtensionArray.from_storage(new_dtype, new_storage_arr),
)
elif daft_schema[field.name].dtype._is_tensor_type():
assert isinstance(field.type, pa.ExtensionType)
new_arrs[idx] = (field.name, ArrowTensorArray.from_numpy(partition.get_column(field.name).to_pylist()))
for idx, (field_name, arr) in new_arrs.items():
arrow_tbl = arrow_tbl.set_column(idx, pa.field(field_name, arr.type), arr)

return arrow_tbl
except pa.ArrowInvalid:
return partition.to_pylist()

Expand Down
38 changes: 9 additions & 29 deletions daft/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
_RAY_DATA_EXTENSIONS_AVAILABLE = True
try:
from ray.data.extensions import (
ArrowTensorArray,
ArrowTensorType,
ArrowVariableShapedTensorType,
)
Expand Down Expand Up @@ -242,40 +241,21 @@ def rename(self, name: str) -> Series:
def datatype(self) -> DataType:
return DataType._from_pydatatype(self._series.data_type())

def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> pa.Array:
def to_arrow(self) -> pa.Array:
"""
Convert this Series to an pyarrow array.
"""
dtype = self.datatype()
if cast_tensors_to_ray_tensor_dtype and (dtype._is_tensor_type() or dtype._is_fixed_shape_tensor_type()):
if not _RAY_DATA_EXTENSIONS_AVAILABLE:
raise ValueError("Trying to convert tensors to Ray tensor dtypes, but Ray is not installed.")
pyarrow_dtype = dtype.to_arrow_dtype(cast_tensor_to_ray_type=True)
if isinstance(pyarrow_dtype, ArrowTensorType):
assert dtype._is_fixed_shape_tensor_type()
arrow_series = self._series.to_arrow()
storage = arrow_series.storage
list_size = storage.type.list_size
storage = pa.ListArray.from_arrays(
pa.array(
list(range(0, (len(arrow_series) + 1) * list_size, list_size)),
pa.int32(),
),
storage.values,
)
return pa.ExtensionArray.from_storage(pyarrow_dtype, storage)
else:
# Variable-shaped tensor columns can't be converted directly to Ray's variable-shaped tensor extension
# type since it expects all tensor elements to have the same number of dimensions, which Daft does not enforce.
# TODO(Clark): Convert directly to Ray's variable-shaped tensor extension type when all tensor
# elements have the same number of dimensions, without going through pylist roundtrip.
return ArrowTensorArray.from_numpy(self.to_pylist())
elif dtype._is_fixed_shape_tensor_type() and pyarrow_supports_fixed_shape_tensor():
pyarrow_dtype = dtype.to_arrow_dtype(cast_tensor_to_ray_type=False)
arrow_arr = self._series.to_arrow()

# Special-case for PyArrow FixedShapeTensor if it is supported by the version of PyArrow
# TODO: Push this down into self._series.to_arrow()?
if dtype._is_fixed_shape_tensor_type() and pyarrow_supports_fixed_shape_tensor():
pyarrow_dtype = dtype.to_arrow_dtype()
arrow_series = self._series.to_arrow()
return pa.ExtensionArray.from_storage(pyarrow_dtype, arrow_series.storage)
else:
return self._series.to_arrow()

return arrow_arr

def to_pylist(self) -> list:
"""
Expand Down
8 changes: 2 additions & 6 deletions daft/table/micropartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,8 @@ def slice(self, start: int, end: int) -> MicroPartition:
def to_table(self) -> Table:
return Table._from_pytable(self._micropartition.to_table())

def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False, convert_large_arrays: bool = False) -> pa.Table:
return self.to_table().to_arrow(
cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype, convert_large_arrays=convert_large_arrays
)
def to_arrow(self) -> pa.Table:
return self.to_table().to_arrow()

def to_pydict(self) -> dict[str, list]:
return self.to_table().to_pydict()
Expand All @@ -153,12 +151,10 @@ def to_pylist(self) -> list[dict[str, Any]]:
def to_pandas(
self,
schema: Schema | None = None,
cast_tensors_to_ray_tensor_dtype: bool = False,
coerce_temporal_nanoseconds: bool = False,
) -> pd.DataFrame:
return self.to_table().to_pandas(
schema=schema,
cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype,
coerce_temporal_nanoseconds=coerce_temporal_nanoseconds,
)

Expand Down
75 changes: 9 additions & 66 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,13 @@
from daft.logical.schema import Schema
from daft.series import Series, item_to_series

_NUMPY_AVAILABLE = True
try:
import numpy as np
except ImportError:
_NUMPY_AVAILABLE = False

_PANDAS_AVAILABLE = True
try:
import pandas as pd
except ImportError:
_PANDAS_AVAILABLE = False

if TYPE_CHECKING:
import numpy as np
import pandas as pd
import pyarrow as pa

Expand Down Expand Up @@ -176,36 +169,9 @@ def to_table(self) -> Table:
"""For compatibility with MicroPartition"""
return self

def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False, convert_large_arrays: bool = False) -> pa.Table:
python_fields = set()
tensor_fields = set()
for field in self.schema():
if field.dtype._is_python_type():
python_fields.add(field.name)
elif field.dtype._is_tensor_type() or field.dtype._is_fixed_shape_tensor_type():
tensor_fields.add(field.name)
if python_fields or tensor_fields:
table = {}
for colname in self.column_names():
column_series = self.get_column(colname)
if colname in python_fields:
column = column_series.to_pylist()
else:
column = column_series.to_arrow(cast_tensors_to_ray_tensor_dtype)
table[colname] = column

tab = pa.Table.from_pydict(table)
else:
tab = pa.Table.from_batches([self._table.to_arrow_record_batch()])

if not convert_large_arrays:
return tab

new_columns = []
for col in tab.columns:
new_columns.append(_trim_pyarrow_large_arrays(col))

return pa.Table.from_arrays(new_columns, names=tab.column_names)
def to_arrow(self) -> pa.Table:
tab = pa.Table.from_pydict({colname: self.get_column(colname).to_arrow() for colname in self.column_names()})
return tab

def to_pydict(self) -> dict[str, list]:
return {colname: self.get_column(colname).to_pylist() for colname in self.column_names()}
Expand All @@ -220,30 +186,31 @@ def to_pylist(self) -> list[dict[str, Any]]:
def to_pandas(
self,
schema: Schema | None = None,
cast_tensors_to_ray_tensor_dtype: bool = False,
coerce_temporal_nanoseconds: bool = False,
) -> pd.DataFrame:
from packaging.version import parse

if not _PANDAS_AVAILABLE:
raise ImportError("Unable to import Pandas - please ensure that it is installed.")

python_fields = set()
tensor_fields = set()
for field in self.schema():
if field.dtype._is_python_type():
python_fields.add(field.name)
elif field.dtype._is_tensor_type() or field.dtype._is_fixed_shape_tensor_type():
tensor_fields.add(field.name)

if python_fields or tensor_fields:
# Use Python list representation for Python typed columns.
table = {}
for colname in self.column_names():
column_series = self.get_column(colname)
if colname in python_fields or (colname in tensor_fields and not cast_tensors_to_ray_tensor_dtype):
# Use Python list representation for Python typed columns or tensor columns (return as numpy)
if colname in python_fields or colname in tensor_fields:
column = column_series.to_pylist()
else:
# Arrow-native field, so provide column as Arrow array.
column_arrow = column_series.to_arrow(cast_tensors_to_ray_tensor_dtype)
column_arrow = column_series.to_arrow()
if parse(pa.__version__) < parse("13.0.0"):
column = column_arrow.to_pandas()
else:
Expand All @@ -252,7 +219,7 @@ def to_pandas(

return pd.DataFrame.from_dict(table)
else:
arrow_table = self.to_arrow(cast_tensors_to_ray_tensor_dtype)
arrow_table = self.to_arrow()
if parse(pa.__version__) < parse("13.0.0"):
return arrow_table.to_pandas()
else:
Expand Down Expand Up @@ -559,30 +526,6 @@ def read_json(
)


def _trim_pyarrow_large_arrays(arr: pa.ChunkedArray) -> pa.ChunkedArray:
if pa.types.is_large_binary(arr.type) or pa.types.is_large_string(arr.type):
if pa.types.is_large_binary(arr.type):
target_type = pa.binary()
else:
target_type = pa.string()

all_chunks = []
for chunk in arr.chunks:
if len(chunk) == 0:
continue
offsets = np.frombuffer(chunk.buffers()[1], dtype=np.int64)
if offsets[-1] < 2**31:
all_chunks.append(chunk.cast(target_type))
else:
raise ValueError(
f"Can not convert {arr.type} into {target_type} due to the offset array being too large: {offsets[-1]}. Maximum: {2**31}"
)

return pa.chunked_array(all_chunks, type=target_type)
else:
return arr


def read_parquet_into_pyarrow(
path: str,
columns: list[str] | None = None,
Expand Down
Loading

0 comments on commit 3c2af5a

Please sign in to comment.