Skip to content

Commit

Permalink
[BUG] Raise error when Ray Data tensor cannot be pickled and disable …
Browse files Browse the repository at this point in the history
…compliant nested types (#2428)

This solves some of our flaky unit tests that try to read a Ray dataset
tensor type under a Pyarrow version that does not properly serialize it.
Adds a new exception when we detect this issue, and also updates our
pyarrow dependencies to 13.0.0.

I also found a separate bug upon upgrading to pyarrow, where versions >=
13.0.0 will use compliant Parquet nested type names by default (see
apache/arrow#35146), which would then cause a
discrepancy between the Parquet and Arrow schemas for extension types.
This would lead to incorrect conversion from Parquet to Arrow when we
would read a file. This PR disables that explicitly.

Finally I also added a `coerce_temporal_nanoseconds` parameter to
`to_pandas` to revert it to its pre pyarrow>=13.0.0 behavior, which
makes the sql integration tests pass again

Confirmed to raise proper error in local testing
  • Loading branch information
kevinzwang authored Jun 22, 2024
1 parent 4951842 commit b7e6e41
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
matrix:
python-version: ['3.8', '3.10']
daft-runner: [py, ray]
pyarrow-version: [7.0.0, 12.0.1]
pyarrow-version: [7.0.0, 13.0.0]
enable-aqe: [0, 1]
os: [ubuntu-20.04, windows-latest]
exclude:
Expand Down
2 changes: 1 addition & 1 deletion benchmarking/parquet/benchmark-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pytest==7.4.0
pytest-benchmark==4.0.0
pytest-memray==1.4.1
pyarrow==12.0.1
pyarrow==13.0.0
boto3==1.28.3
24 changes: 22 additions & 2 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1778,10 +1778,16 @@ def __contains__(self, col_name: str) -> bool:
return col_name in self.column_names

@DataframePublicAPI
def to_pandas(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pandas.DataFrame":
def to_pandas(
self, cast_tensors_to_ray_tensor_dtype: bool = False, coerce_temporal_nanoseconds: bool = False
) -> "pandas.DataFrame":
"""Converts the current DataFrame to a `pandas DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__.
If results have not computed yet, collect will be called.
Args:
cast_tensors_to_ray_tensor_dtype (bool): Whether to cast tensors to Ray tensor dtype. Defaults to False.
coerce_temporal_nanoseconds (bool): Whether to coerce temporal columns to nanoseconds. Only applicable to pandas version >= 2.0 and pyarrow version >= 13.0.0. Defaults to False. See `pyarrow.Table.to_pandas <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pandas>`__ for more information.
Returns:
pandas.DataFrame: `pandas DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__ converted from a Daft DataFrame
Expand All @@ -1795,6 +1801,7 @@ def to_pandas(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pandas.D
pd_df = result.to_pandas(
schema=self._builder.schema(),
cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype,
coerce_temporal_nanoseconds=coerce_temporal_nanoseconds,
)
return pd_df

Expand Down Expand Up @@ -1901,6 +1908,8 @@ def to_ray_dataset(self) -> "ray.data.dataset.DataSet":
@classmethod
def _from_ray_dataset(cls, ds: "ray.data.dataset.DataSet") -> "DataFrame":
"""Creates a DataFrame from a `Ray Dataset <https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.html#ray.data.Dataset>`__."""
from ray.exceptions import RayTaskError

context = get_context()
if context.runner_config.name != "ray":
raise ValueError("Daft needs to be running on the Ray Runner for this operation")
Expand All @@ -1912,7 +1921,18 @@ def _from_ray_dataset(cls, ds: "ray.data.dataset.DataSet") -> "DataFrame":

partition_set, schema = ray_runner_io.partition_set_from_ray_dataset(ds)
cache_entry = context.runner().put_partition_set_into_cache(partition_set)
size_bytes = partition_set.size_bytes()
try:
size_bytes = partition_set.size_bytes()
except RayTaskError as e:
import pyarrow as pa
from packaging.version import parse

if "extension<arrow.fixed_shape_tensor>" in str(e) and parse(pa.__version__) < parse("13.0.0"):
raise ValueError(
f"Reading Ray Dataset tensors is only supported with PyArrow >= 13.0.0, found {pa.__version__}. See this issue for more information: https://github.com/apache/arrow/pull/35933"
) from e
raise e

num_rows = len(partition_set)
assert size_bytes is not None, "In-memory data should always have non-None size in bytes"
builder = LogicalPlanBuilder.from_in_memory_scan(
Expand Down
2 changes: 2 additions & 0 deletions daft/runners/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,13 @@ def to_pandas(
self,
schema: Schema | None = None,
cast_tensors_to_ray_tensor_dtype: bool = False,
coerce_temporal_nanoseconds: bool = False,
) -> pd.DataFrame:
merged_partition = self._get_merged_vpartition()
return merged_partition.to_pandas(
schema=schema,
cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype,
coerce_temporal_nanoseconds=coerce_temporal_nanoseconds,
)

def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> pa.Table:
Expand Down
11 changes: 9 additions & 2 deletions daft/table/micropartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,16 @@ def to_pydict(self) -> dict[str, list]:
def to_pylist(self) -> list[dict[str, Any]]:
return self.to_table().to_pylist()

def to_pandas(self, schema: Schema | None = None, cast_tensors_to_ray_tensor_dtype: bool = False) -> pd.DataFrame:
def to_pandas(
self,
schema: Schema | None = None,
cast_tensors_to_ray_tensor_dtype: bool = False,
coerce_temporal_nanoseconds: bool = False,
) -> pd.DataFrame:
return self.to_table().to_pandas(
schema=schema, cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype
schema=schema,
cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype,
coerce_temporal_nanoseconds=coerce_temporal_nanoseconds,
)

###
Expand Down
21 changes: 18 additions & 3 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,14 @@ def to_pylist(self) -> list[dict[str, Any]]:
column_names = self.column_names()
return [{colname: table[colname][i] for colname in column_names} for i in range(len(self))]

def to_pandas(self, schema: Schema | None = None, cast_tensors_to_ray_tensor_dtype: bool = False) -> pd.DataFrame:
def to_pandas(
self,
schema: Schema | None = None,
cast_tensors_to_ray_tensor_dtype: bool = False,
coerce_temporal_nanoseconds: bool = False,
) -> pd.DataFrame:
from packaging.version import parse

if not _PANDAS_AVAILABLE:
raise ImportError("Unable to import Pandas - please ensure that it is installed.")
python_fields = set()
Expand All @@ -236,12 +243,20 @@ def to_pandas(self, schema: Schema | None = None, cast_tensors_to_ray_tensor_dty
column = column_series.to_pylist()
else:
# Arrow-native field, so provide column as Arrow array.
column = column_series.to_arrow(cast_tensors_to_ray_tensor_dtype).to_pandas()
column_arrow = column_series.to_arrow(cast_tensors_to_ray_tensor_dtype)
if parse(pa.__version__) < parse("13.0.0"):
column = column_arrow.to_pandas()
else:
column = column_arrow.to_pandas(coerce_temporal_nanoseconds=coerce_temporal_nanoseconds)
table[colname] = column

return pd.DataFrame.from_dict(table)
else:
return self.to_arrow(cast_tensors_to_ray_tensor_dtype).to_pandas()
arrow_table = self.to_arrow(cast_tensors_to_ray_tensor_dtype)
if parse(pa.__version__) < parse("13.0.0"):
return arrow_table.to_pandas()
else:
return arrow_table.to_pandas(coerce_temporal_nanoseconds=coerce_temporal_nanoseconds)

###
# Compute methods (Table -> Table)
Expand Down
10 changes: 6 additions & 4 deletions daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ def write_tabular(
format = pads.ParquetFileFormat()
inflation_factor = execution_config.parquet_inflation_factor
target_file_size = execution_config.parquet_target_filesize
opts = format.make_write_options(compression=compression)
opts = format.make_write_options(compression=compression, use_compliant_nested_type=False)
elif file_format == FileFormat.Csv:
format = pads.CsvFileFormat()
opts = None
Expand Down Expand Up @@ -633,12 +633,14 @@ def file_visitor(written_file, protocol=protocol):

format = pads.ParquetFileFormat()

opts = format.make_write_options(compression="zstd", use_compliant_nested_type=False)

_write_tabular_arrow_table(
arrow_table=arrow_table,
schema=file_schema,
full_path=resolved_path,
format=format,
opts=format.make_write_options(compression="zstd"),
opts=opts,
fs=fs,
rows_per_file=rows_per_file,
rows_per_row_group=rows_per_row_group,
Expand Down Expand Up @@ -735,10 +737,10 @@ def file_visitor(written_file: Any) -> None:
target_row_groups = max(math.ceil(size_bytes / target_row_group_size / inflation_factor), 1)
rows_per_row_group = max(min(math.ceil(num_rows / target_row_groups), rows_per_file), 1)

opts = pads.ParquetFileFormat().make_write_options(use_compliant_nested_type=False)

format = pads.ParquetFileFormat()

opts = format.make_write_options(use_compliant_nested_type=False)

_write_tabular_arrow_table(
arrow_table=arrow_batch,
schema=None,
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Pillow==9.5.0
opencv-python==4.8.1.78

# Pyarrow
pyarrow==12.0.1
pyarrow==13.0.0
# Ray
ray[data, client]==2.7.1; python_version < '3.8'
ray[data, client]==2.10.0; python_version >= '3.8'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ uvicorn==0.23.2
uvloop==0.17.0
watchfiles==0.19.0
websockets==11.0.3
pyarrow==12.0.1
pyarrow==13.0.0
slowapi==0.1.8

# Pin numpy version otherwise pyarrow doesn't work
Expand Down
20 changes: 10 additions & 10 deletions tests/integration/sql/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_sql_show(test_db) -> None:
def test_sql_create_dataframe_ok(test_db, pdf) -> None:
df = daft.read_sql(f"SELECT * FROM {TEST_TABLE_NAME}", test_db)

assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand All @@ -40,7 +40,7 @@ def test_sql_partitioned_read(test_db, num_partitions, pdf) -> None:

df = daft.read_sql(f"SELECT * FROM {TEST_TABLE_NAME}", test_db, partition_col="id")
assert df.num_partitions() == num_partitions
assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand All @@ -53,7 +53,7 @@ def test_sql_partitioned_read_with_custom_num_partitions_and_partition_col(
f"SELECT * FROM {TEST_TABLE_NAME}", test_db, partition_col=partition_col, num_partitions=num_partitions
)
assert df.num_partitions() == num_partitions
assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand All @@ -66,7 +66,7 @@ def test_sql_partitioned_read_with_non_uniformly_distributed_column(test_db, num
num_partitions=num_partitions,
)
assert df.num_partitions() == num_partitions
assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand Down Expand Up @@ -122,7 +122,7 @@ def test_sql_read_with_binary_filter_pushdowns(test_db, column, operator, value,
df = df.where(df[column] <= value)
pdf = pdf[pdf[column] <= value]

assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand All @@ -133,7 +133,7 @@ def test_sql_read_with_is_null_filter_pushdowns(test_db, num_partitions, pdf) ->

pdf = pdf[pdf["null_col"].isnull()]

assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand All @@ -144,7 +144,7 @@ def test_sql_read_with_not_null_filter_pushdowns(test_db, num_partitions, pdf) -

pdf = pdf[pdf["null_col"].notnull()]

assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand All @@ -155,7 +155,7 @@ def test_sql_read_with_if_else_filter_pushdown(test_db, num_partitions, pdf) ->

pdf = pdf[(pdf["id"] > 100) & (pdf["float_col"] > 150) | (pdf["float_col"] < 50)]

assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand All @@ -165,7 +165,7 @@ def test_sql_read_with_is_in_filter_pushdown(test_db, num_partitions, pdf) -> No
df = df.where(df["id"].is_in([1, 2, 3]))

pdf = pdf[pdf["id"].isin([1, 2, 3])]
assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand Down Expand Up @@ -220,7 +220,7 @@ def create_conn():
return sqlalchemy.create_engine(test_db).connect()

df = daft.read_sql(f"SELECT * FROM {TEST_TABLE_NAME}", create_conn)
assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand Down
19 changes: 10 additions & 9 deletions tests/series/test_if_else.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,29 +428,25 @@ def test_series_if_else_extension_type(uuid_ext_type, if_true_storage, if_false_
(
np.arange(16).reshape((4, 2, 2)),
np.arange(16, 32).reshape((4, 2, 2)),
np.array(
[[[0, 1], [2, 3]], [[20, 21], [22, 23]], [[np.nan, np.nan], [np.nan, np.nan]], [[12, 13], [14, 15]]]
),
np.array([[[0, 1], [2, 3]], [[20, 21], [22, 23]], [[12, 13], [14, 15]]]),
),
# Broadcast left
(
np.arange(4).reshape((1, 2, 2)),
np.arange(16, 32).reshape((4, 2, 2)),
np.array([[[0, 1], [2, 3]], [[20, 21], [22, 23]], [[np.nan, np.nan], [np.nan, np.nan]], [[0, 1], [2, 3]]]),
np.array([[[0, 1], [2, 3]], [[20, 21], [22, 23]], [[0, 1], [2, 3]]]),
),
# Broadcast right
(
np.arange(16).reshape((4, 2, 2)),
np.arange(16, 20).reshape((1, 2, 2)),
np.array(
[[[0, 1], [2, 3]], [[16, 17], [18, 19]], [[np.nan, np.nan], [np.nan, np.nan]], [[12, 13], [14, 15]]]
),
np.array([[[0, 1], [2, 3]], [[16, 17], [18, 19]], [[12, 13], [14, 15]]]),
),
# Broadcast both
(
np.arange(4).reshape((1, 2, 2)),
np.arange(16, 20).reshape((1, 2, 2)),
np.array([[[0, 1], [2, 3]], [[16, 17], [18, 19]], [[np.nan, np.nan], [np.nan, np.nan]], [[0, 1], [2, 3]]]),
np.array([[[0, 1], [2, 3]], [[16, 17], [18, 19]], [[0, 1], [2, 3]]]),
),
],
)
Expand All @@ -467,7 +463,12 @@ def test_series_if_else_canonical_tensor_extension_type(if_true, if_false, expec
DataType.from_arrow_type(if_true_arrow.type.storage_type.value_type), (2, 2)
)
result_arrow = result.to_arrow()
np.testing.assert_equal(result_arrow.to_numpy_ndarray(), expected)

# null element conversion to numpy is not well defined in pyarrow and changes between releases
# so this is a workaround to ensure our tests pass regardless of the pyarrow version
assert not result_arrow[2].is_valid
result_array_filtered = result_arrow.filter(pa.array([True, True, False, True]))
np.testing.assert_equal(result_array_filtered.to_numpy_ndarray(), expected)


@pytest.mark.parametrize(
Expand Down

0 comments on commit b7e6e41

Please sign in to comment.