diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 894b54ef0c..cca46ce14f 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -22,7 +22,7 @@ jobs: matrix: python-version: ['3.8', '3.10'] daft-runner: [py, ray] - pyarrow-version: [7.0.0, 12.0.1] + pyarrow-version: [7.0.0, 13.0.0] enable-aqe: [0, 1] os: [ubuntu-20.04, windows-latest] exclude: diff --git a/benchmarking/parquet/benchmark-requirements.txt b/benchmarking/parquet/benchmark-requirements.txt index 248520aae3..a308fcd5f5 100644 --- a/benchmarking/parquet/benchmark-requirements.txt +++ b/benchmarking/parquet/benchmark-requirements.txt @@ -1,5 +1,5 @@ pytest==7.4.0 pytest-benchmark==4.0.0 pytest-memray==1.4.1 -pyarrow==12.0.1 +pyarrow==13.0.0 boto3==1.28.3 diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 824e2ea302..b366aba27f 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -1778,10 +1778,16 @@ def __contains__(self, col_name: str) -> bool: return col_name in self.column_names @DataframePublicAPI - def to_pandas(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pandas.DataFrame": + def to_pandas( + self, cast_tensors_to_ray_tensor_dtype: bool = False, coerce_temporal_nanoseconds: bool = False + ) -> "pandas.DataFrame": """Converts the current DataFrame to a `pandas DataFrame `__. If results have not computed yet, collect will be called. + Args: + cast_tensors_to_ray_tensor_dtype (bool): Whether to cast tensors to Ray tensor dtype. Defaults to False. + coerce_temporal_nanoseconds (bool): Whether to coerce temporal columns to nanoseconds. Only applicable to pandas version >= 2.0 and pyarrow version >= 13.0.0. Defaults to False. See `pyarrow.Table.to_pandas `__ for more information. + Returns: pandas.DataFrame: `pandas DataFrame `__ converted from a Daft DataFrame @@ -1795,6 +1801,7 @@ def to_pandas(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pandas.D pd_df = result.to_pandas( schema=self._builder.schema(), cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype, + coerce_temporal_nanoseconds=coerce_temporal_nanoseconds, ) return pd_df @@ -1901,6 +1908,8 @@ def to_ray_dataset(self) -> "ray.data.dataset.DataSet": @classmethod def _from_ray_dataset(cls, ds: "ray.data.dataset.DataSet") -> "DataFrame": """Creates a DataFrame from a `Ray Dataset `__.""" + from ray.exceptions import RayTaskError + context = get_context() if context.runner_config.name != "ray": raise ValueError("Daft needs to be running on the Ray Runner for this operation") @@ -1912,7 +1921,18 @@ def _from_ray_dataset(cls, ds: "ray.data.dataset.DataSet") -> "DataFrame": partition_set, schema = ray_runner_io.partition_set_from_ray_dataset(ds) cache_entry = context.runner().put_partition_set_into_cache(partition_set) - size_bytes = partition_set.size_bytes() + try: + size_bytes = partition_set.size_bytes() + except RayTaskError as e: + import pyarrow as pa + from packaging.version import parse + + if "extension" in str(e) and parse(pa.__version__) < parse("13.0.0"): + raise ValueError( + f"Reading Ray Dataset tensors is only supported with PyArrow >= 13.0.0, found {pa.__version__}. See this issue for more information: https://github.com/apache/arrow/pull/35933" + ) from e + raise e + num_rows = len(partition_set) assert size_bytes is not None, "In-memory data should always have non-None size in bytes" builder = LogicalPlanBuilder.from_in_memory_scan( diff --git a/daft/runners/partitioning.py b/daft/runners/partitioning.py index 7bcca40665..ff395594ff 100644 --- a/daft/runners/partitioning.py +++ b/daft/runners/partitioning.py @@ -217,11 +217,13 @@ def to_pandas( self, schema: Schema | None = None, cast_tensors_to_ray_tensor_dtype: bool = False, + coerce_temporal_nanoseconds: bool = False, ) -> pd.DataFrame: merged_partition = self._get_merged_vpartition() return merged_partition.to_pandas( schema=schema, cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype, + coerce_temporal_nanoseconds=coerce_temporal_nanoseconds, ) def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> pa.Table: diff --git a/daft/table/micropartition.py b/daft/table/micropartition.py index c00ca23062..66d1f8dd99 100644 --- a/daft/table/micropartition.py +++ b/daft/table/micropartition.py @@ -150,9 +150,16 @@ def to_pydict(self) -> dict[str, list]: def to_pylist(self) -> list[dict[str, Any]]: return self.to_table().to_pylist() - def to_pandas(self, schema: Schema | None = None, cast_tensors_to_ray_tensor_dtype: bool = False) -> pd.DataFrame: + def to_pandas( + self, + schema: Schema | None = None, + cast_tensors_to_ray_tensor_dtype: bool = False, + coerce_temporal_nanoseconds: bool = False, + ) -> pd.DataFrame: return self.to_table().to_pandas( - schema=schema, cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype + schema=schema, + cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype, + coerce_temporal_nanoseconds=coerce_temporal_nanoseconds, ) ### diff --git a/daft/table/table.py b/daft/table/table.py index 2166ddcc8b..23854735c8 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -217,7 +217,14 @@ def to_pylist(self) -> list[dict[str, Any]]: column_names = self.column_names() return [{colname: table[colname][i] for colname in column_names} for i in range(len(self))] - def to_pandas(self, schema: Schema | None = None, cast_tensors_to_ray_tensor_dtype: bool = False) -> pd.DataFrame: + def to_pandas( + self, + schema: Schema | None = None, + cast_tensors_to_ray_tensor_dtype: bool = False, + coerce_temporal_nanoseconds: bool = False, + ) -> pd.DataFrame: + from packaging.version import parse + if not _PANDAS_AVAILABLE: raise ImportError("Unable to import Pandas - please ensure that it is installed.") python_fields = set() @@ -236,12 +243,20 @@ def to_pandas(self, schema: Schema | None = None, cast_tensors_to_ray_tensor_dty column = column_series.to_pylist() else: # Arrow-native field, so provide column as Arrow array. - column = column_series.to_arrow(cast_tensors_to_ray_tensor_dtype).to_pandas() + column_arrow = column_series.to_arrow(cast_tensors_to_ray_tensor_dtype) + if parse(pa.__version__) < parse("13.0.0"): + column = column_arrow.to_pandas() + else: + column = column_arrow.to_pandas(coerce_temporal_nanoseconds=coerce_temporal_nanoseconds) table[colname] = column return pd.DataFrame.from_dict(table) else: - return self.to_arrow(cast_tensors_to_ray_tensor_dtype).to_pandas() + arrow_table = self.to_arrow(cast_tensors_to_ray_tensor_dtype) + if parse(pa.__version__) < parse("13.0.0"): + return arrow_table.to_pandas() + else: + return arrow_table.to_pandas(coerce_temporal_nanoseconds=coerce_temporal_nanoseconds) ### # Compute methods (Table -> Table) diff --git a/daft/table/table_io.py b/daft/table/table_io.py index b8954d81cc..d47e606264 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -453,7 +453,7 @@ def write_tabular( format = pads.ParquetFileFormat() inflation_factor = execution_config.parquet_inflation_factor target_file_size = execution_config.parquet_target_filesize - opts = format.make_write_options(compression=compression) + opts = format.make_write_options(compression=compression, use_compliant_nested_type=False) elif file_format == FileFormat.Csv: format = pads.CsvFileFormat() opts = None @@ -633,12 +633,14 @@ def file_visitor(written_file, protocol=protocol): format = pads.ParquetFileFormat() + opts = format.make_write_options(compression="zstd", use_compliant_nested_type=False) + _write_tabular_arrow_table( arrow_table=arrow_table, schema=file_schema, full_path=resolved_path, format=format, - opts=format.make_write_options(compression="zstd"), + opts=opts, fs=fs, rows_per_file=rows_per_file, rows_per_row_group=rows_per_row_group, @@ -735,10 +737,10 @@ def file_visitor(written_file: Any) -> None: target_row_groups = max(math.ceil(size_bytes / target_row_group_size / inflation_factor), 1) rows_per_row_group = max(min(math.ceil(num_rows / target_row_groups), rows_per_file), 1) - opts = pads.ParquetFileFormat().make_write_options(use_compliant_nested_type=False) - format = pads.ParquetFileFormat() + opts = format.make_write_options(use_compliant_nested_type=False) + _write_tabular_arrow_table( arrow_table=arrow_batch, schema=None, diff --git a/requirements-dev.txt b/requirements-dev.txt index 29b0ef07d3..623fdefd08 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -34,7 +34,7 @@ Pillow==9.5.0 opencv-python==4.8.1.78 # Pyarrow -pyarrow==12.0.1 +pyarrow==13.0.0 # Ray ray[data, client]==2.7.1; python_version < '3.8' ray[data, client]==2.10.0; python_version >= '3.8' diff --git a/tests/integration/io/docker-compose/retry_server/retry-server-requirements.txt b/tests/integration/io/docker-compose/retry_server/retry-server-requirements.txt index d3d0697331..fc550f4eb1 100644 --- a/tests/integration/io/docker-compose/retry_server/retry-server-requirements.txt +++ b/tests/integration/io/docker-compose/retry_server/retry-server-requirements.txt @@ -17,7 +17,7 @@ uvicorn==0.23.2 uvloop==0.17.0 watchfiles==0.19.0 websockets==11.0.3 -pyarrow==12.0.1 +pyarrow==13.0.0 slowapi==0.1.8 # Pin numpy version otherwise pyarrow doesn't work diff --git a/tests/integration/sql/test_sql.py b/tests/integration/sql/test_sql.py index ed3821208d..3ca30ce6b2 100644 --- a/tests/integration/sql/test_sql.py +++ b/tests/integration/sql/test_sql.py @@ -28,7 +28,7 @@ def test_sql_show(test_db) -> None: def test_sql_create_dataframe_ok(test_db, pdf) -> None: df = daft.read_sql(f"SELECT * FROM {TEST_TABLE_NAME}", test_db) - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -40,7 +40,7 @@ def test_sql_partitioned_read(test_db, num_partitions, pdf) -> None: df = daft.read_sql(f"SELECT * FROM {TEST_TABLE_NAME}", test_db, partition_col="id") assert df.num_partitions() == num_partitions - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -53,7 +53,7 @@ def test_sql_partitioned_read_with_custom_num_partitions_and_partition_col( f"SELECT * FROM {TEST_TABLE_NAME}", test_db, partition_col=partition_col, num_partitions=num_partitions ) assert df.num_partitions() == num_partitions - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -66,7 +66,7 @@ def test_sql_partitioned_read_with_non_uniformly_distributed_column(test_db, num num_partitions=num_partitions, ) assert df.num_partitions() == num_partitions - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -122,7 +122,7 @@ def test_sql_read_with_binary_filter_pushdowns(test_db, column, operator, value, df = df.where(df[column] <= value) pdf = pdf[pdf[column] <= value] - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -133,7 +133,7 @@ def test_sql_read_with_is_null_filter_pushdowns(test_db, num_partitions, pdf) -> pdf = pdf[pdf["null_col"].isnull()] - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -144,7 +144,7 @@ def test_sql_read_with_not_null_filter_pushdowns(test_db, num_partitions, pdf) - pdf = pdf[pdf["null_col"].notnull()] - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -155,7 +155,7 @@ def test_sql_read_with_if_else_filter_pushdown(test_db, num_partitions, pdf) -> pdf = pdf[(pdf["id"] > 100) & (pdf["float_col"] > 150) | (pdf["float_col"] < 50)] - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -165,7 +165,7 @@ def test_sql_read_with_is_in_filter_pushdown(test_db, num_partitions, pdf) -> No df = df.where(df["id"].is_in([1, 2, 3])) pdf = pdf[pdf["id"].isin([1, 2, 3])] - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -220,7 +220,7 @@ def create_conn(): return sqlalchemy.create_engine(test_db).connect() df = daft.read_sql(f"SELECT * FROM {TEST_TABLE_NAME}", create_conn) - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() diff --git a/tests/series/test_if_else.py b/tests/series/test_if_else.py index 090d232538..896afde4a6 100644 --- a/tests/series/test_if_else.py +++ b/tests/series/test_if_else.py @@ -428,29 +428,25 @@ def test_series_if_else_extension_type(uuid_ext_type, if_true_storage, if_false_ ( np.arange(16).reshape((4, 2, 2)), np.arange(16, 32).reshape((4, 2, 2)), - np.array( - [[[0, 1], [2, 3]], [[20, 21], [22, 23]], [[np.nan, np.nan], [np.nan, np.nan]], [[12, 13], [14, 15]]] - ), + np.array([[[0, 1], [2, 3]], [[20, 21], [22, 23]], [[12, 13], [14, 15]]]), ), # Broadcast left ( np.arange(4).reshape((1, 2, 2)), np.arange(16, 32).reshape((4, 2, 2)), - np.array([[[0, 1], [2, 3]], [[20, 21], [22, 23]], [[np.nan, np.nan], [np.nan, np.nan]], [[0, 1], [2, 3]]]), + np.array([[[0, 1], [2, 3]], [[20, 21], [22, 23]], [[0, 1], [2, 3]]]), ), # Broadcast right ( np.arange(16).reshape((4, 2, 2)), np.arange(16, 20).reshape((1, 2, 2)), - np.array( - [[[0, 1], [2, 3]], [[16, 17], [18, 19]], [[np.nan, np.nan], [np.nan, np.nan]], [[12, 13], [14, 15]]] - ), + np.array([[[0, 1], [2, 3]], [[16, 17], [18, 19]], [[12, 13], [14, 15]]]), ), # Broadcast both ( np.arange(4).reshape((1, 2, 2)), np.arange(16, 20).reshape((1, 2, 2)), - np.array([[[0, 1], [2, 3]], [[16, 17], [18, 19]], [[np.nan, np.nan], [np.nan, np.nan]], [[0, 1], [2, 3]]]), + np.array([[[0, 1], [2, 3]], [[16, 17], [18, 19]], [[0, 1], [2, 3]]]), ), ], ) @@ -467,7 +463,12 @@ def test_series_if_else_canonical_tensor_extension_type(if_true, if_false, expec DataType.from_arrow_type(if_true_arrow.type.storage_type.value_type), (2, 2) ) result_arrow = result.to_arrow() - np.testing.assert_equal(result_arrow.to_numpy_ndarray(), expected) + + # null element conversion to numpy is not well defined in pyarrow and changes between releases + # so this is a workaround to ensure our tests pass regardless of the pyarrow version + assert not result_arrow[2].is_valid + result_array_filtered = result_arrow.filter(pa.array([True, True, False, True])) + np.testing.assert_equal(result_array_filtered.to_numpy_ndarray(), expected) @pytest.mark.parametrize(