diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 174c01be07..cc2eb12639 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -21,6 +21,7 @@ jobs: matrix: python-version: ['3.7', '3.10'] daft-runner: [py, ray] + new-query-planner: [1, 0] pyarrow-version: [6.0.1, 12.0] exclude: - daft-runner: ray @@ -74,6 +75,7 @@ jobs: # cargo llvm-cov --no-run --lcov --output-path report-output/rust-coverage-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.daft-runner }}.lcov env: DAFT_RUNNER: ${{ matrix.daft-runner }} + DAFT_NEW_QUERY_PLANNER: ${{ matrix.new-query-planner }} - name: Upload coverage report uses: actions/upload-artifact@v3 @@ -149,6 +151,7 @@ jobs: matrix: python-version: ['3.7'] daft-runner: [py, ray] + new-query-planner: [1, 0] steps: - uses: actions/checkout@v3 with: @@ -183,6 +186,7 @@ jobs: pytest tests/integration/test_tpch.py --durations=50 env: DAFT_RUNNER: ${{ matrix.daft-runner }} + DAFT_NEW_QUERY_PLANNER: ${{ matrix.new-query-planner }} - name: Send Slack notification on failure uses: slackapi/slack-github-action@v1.24.0 if: ${{ failure() && (github.ref == 'refs/heads/main') }} @@ -215,6 +219,7 @@ jobs: matrix: python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs daft-runner: [py, ray] + new-query-planner: [1, 0] # These permissions are needed to interact with GitHub's OIDC Token endpoint. # This is used in the step "Assume GitHub Actions AWS Credentials" permissions: @@ -263,6 +268,7 @@ jobs: pytest tests/integration/io -m 'integration' --durations=50 env: DAFT_RUNNER: ${{ matrix.daft-runner }} + DAFT_NEW_QUERY_PLANNER: ${{ matrix.new-query-planner }} - name: Send Slack notification on failure uses: slackapi/slack-github-action@v1.24.0 if: ${{ failure() && (github.ref == 'refs/heads/main') }} diff --git a/daft/context.py b/daft/context.py index a6b515047c..777b64d56a 100644 --- a/daft/context.py +++ b/daft/context.py @@ -57,7 +57,7 @@ def _get_runner_config_from_env() -> _RunnerConfig: def _get_planner_from_env() -> bool: """Returns whether or not to use the new query planner.""" - return bool(int(os.getenv("DAFT_DEVELOPER_RUST_QUERY_PLANNER", default="0"))) + return bool(int(os.getenv("DAFT_NEW_QUERY_PLANNER", default="0"))) @dataclasses.dataclass(frozen=True) @@ -193,7 +193,7 @@ def set_new_planner() -> DaftContext: WARNING: The new query planner is currently experimental and only partially implemented. - Alternatively, users can set this behavior via an environment variable: DAFT_DEVELOPER_RUST_QUERY_PLANNER=1 + Alternatively, users can set this behavior via an environment variable: DAFT_NEW_QUERY_PLANNER=1 Returns: DaftContext: Daft context after enabling the new query planner. @@ -210,7 +210,7 @@ def set_new_planner() -> DaftContext: def set_old_planner() -> DaftContext: """Enable the old query planner. - Alternatively, users can set this behavior via an environment variable: DAFT_DEVELOPER_RUST_QUERY_PLANNER=0 + Alternatively, users can set this behavior via an environment variable: DAFT_NEW_QUERY_PLANNER=0 Returns: DaftContext: Daft context after enabling the old query planner. diff --git a/daft/logical/rust_logical_plan.py b/daft/logical/rust_logical_plan.py index ad66fbf11f..70fab82377 100644 --- a/daft/logical/rust_logical_plan.py +++ b/daft/logical/rust_logical_plan.py @@ -132,7 +132,10 @@ def explode(self, explode_expressions: ExpressionsProjection) -> RustLogicalPlan def count(self) -> RustLogicalPlanBuilder: # TODO(Clark): Add dedicated logical/physical ops when introducing metadata-based count optimizations. first_col = col(self.schema().column_names()[0]) - builder = self._builder.aggregate([first_col._count(CountMode.All)], []) + builder = self._builder.aggregate([first_col._count(CountMode.All)._expr], []) + rename_expr = ExpressionsProjection([first_col.alias("count")]) + schema = rename_expr.resolve_schema(Schema._from_pyschema(builder.schema())) + builder = builder.project(rename_expr.to_inner_py_exprs(), schema._schema, ResourceRequest()) return RustLogicalPlanBuilder(builder) def distinct(self) -> RustLogicalPlanBuilder: diff --git a/tests/conftest.py b/tests/conftest.py index 5e73f7e0eb..5927c33f21 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,14 +4,6 @@ import pyarrow as pa import pytest -from daft.context import ( - DaftContext, - _set_context, - get_context, - set_new_planner, - set_old_planner, -) - def pytest_configure(config): config.addinivalue_line( @@ -44,13 +36,6 @@ def uuid_ext_type() -> UuidType: pa.unregister_extension_type(ext_type.NAME) -@pytest.fixture(params=[False, True]) -def use_new_planner(request) -> DaftContext: - old_ctx = get_context() - yield set_new_planner() if request.param else set_old_planner() - _set_context(old_ctx) - - def assert_df_equals( daft_df: pd.DataFrame, pd_df: pd.DataFrame, diff --git a/tests/cookbook/test_aggregations.py b/tests/cookbook/test_aggregations.py index 9206c72404..e1ad6b39d6 100644 --- a/tests/cookbook/test_aggregations.py +++ b/tests/cookbook/test_aggregations.py @@ -7,7 +7,7 @@ from tests.conftest import assert_df_equals -def test_sum(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_sum(daft_df, service_requests_csv_pd_df, repartition_nparts): """Sums across an entire column for the entire table""" daft_df = daft_df.repartition(repartition_nparts).sum(col("Unique Key").alias("unique_key_sum")) service_requests_csv_pd_df = pd.DataFrame.from_records( @@ -17,7 +17,7 @@ def test_sum(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_pl assert_df_equals(daft_pd_df, service_requests_csv_pd_df, sort_key="unique_key_sum") -def test_mean(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_mean(daft_df, service_requests_csv_pd_df, repartition_nparts): """Averages across a column for entire table""" daft_df = daft_df.repartition(repartition_nparts).mean(col("Unique Key").alias("unique_key_mean")) service_requests_csv_pd_df = pd.DataFrame.from_records( @@ -27,7 +27,7 @@ def test_mean(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_p assert_df_equals(daft_pd_df, service_requests_csv_pd_df, sort_key="unique_key_mean") -def test_min(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_min(daft_df, service_requests_csv_pd_df, repartition_nparts): """min across a column for entire table""" daft_df = daft_df.repartition(repartition_nparts).min(col("Unique Key").alias("unique_key_min")) service_requests_csv_pd_df = pd.DataFrame.from_records( @@ -37,7 +37,7 @@ def test_min(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_pl assert_df_equals(daft_pd_df, service_requests_csv_pd_df, sort_key="unique_key_min") -def test_max(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_max(daft_df, service_requests_csv_pd_df, repartition_nparts): """max across a column for entire table""" daft_df = daft_df.repartition(repartition_nparts).max(col("Unique Key").alias("unique_key_max")) service_requests_csv_pd_df = pd.DataFrame.from_records( @@ -47,7 +47,7 @@ def test_max(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_pl assert_df_equals(daft_pd_df, service_requests_csv_pd_df, sort_key="unique_key_max") -def test_count(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_count(daft_df, service_requests_csv_pd_df, repartition_nparts): """count a column for entire table""" daft_df = daft_df.repartition(repartition_nparts).count(col("Unique Key").alias("unique_key_count")) service_requests_csv_pd_df = pd.DataFrame.from_records( @@ -58,7 +58,7 @@ def test_count(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_ assert_df_equals(daft_pd_df, service_requests_csv_pd_df, sort_key="unique_key_count") -def test_list(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_list(daft_df, service_requests_csv_pd_df, repartition_nparts): """list agg a column for entire table""" daft_df = daft_df.repartition(repartition_nparts).agg_list(col("Unique Key").alias("unique_key_list")).collect() unique_key_list = service_requests_csv_pd_df["Unique Key"].to_list() @@ -68,7 +68,7 @@ def test_list(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_p assert set(result_list[0]) == set(unique_key_list) -def test_global_agg(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_global_agg(daft_df, service_requests_csv_pd_df, repartition_nparts): """Averages across a column for entire table""" daft_df = daft_df.repartition(repartition_nparts).agg( [ @@ -92,7 +92,7 @@ def test_global_agg(daft_df, service_requests_csv_pd_df, repartition_nparts, use assert_df_equals(daft_pd_df, service_requests_csv_pd_df, sort_key="unique_key_mean") -def test_filtered_sum(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_filtered_sum(daft_df, service_requests_csv_pd_df, repartition_nparts): """Sums across an entire column for the entire table filtered by a certain condition""" daft_df = ( daft_df.repartition(repartition_nparts) @@ -119,7 +119,7 @@ def test_filtered_sum(daft_df, service_requests_csv_pd_df, repartition_nparts, u pytest.param(["Borough", "Complaint Type"], id="NumGroupByKeys:2"), ], ) -def test_sum_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, use_new_planner): +def test_sum_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): """Sums across groups""" daft_df = daft_df.repartition(repartition_nparts).groupby(*[col(k) for k in keys]).sum(col("Unique Key")) service_requests_csv_pd_df = service_requests_csv_pd_df.groupby(keys).sum("Unique Key").reset_index() @@ -134,7 +134,7 @@ def test_sum_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, ke pytest.param(["Borough", "Complaint Type"], id="NumGroupByKeys:2"), ], ) -def test_mean_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, use_new_planner): +def test_mean_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): """Sums across groups""" daft_df = daft_df.repartition(repartition_nparts).groupby(*[col(k) for k in keys]).mean(col("Unique Key")) service_requests_csv_pd_df = service_requests_csv_pd_df.groupby(keys).mean("Unique Key").reset_index() @@ -149,7 +149,7 @@ def test_mean_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, k pytest.param(["Borough", "Complaint Type"], id="NumGroupByKeys:2"), ], ) -def test_count_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, use_new_planner): +def test_count_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): """count across groups""" daft_df = daft_df.repartition(repartition_nparts).groupby(*[col(k) for k in keys]).count() service_requests_csv_pd_df = service_requests_csv_pd_df.groupby(keys).count().reset_index() @@ -167,7 +167,7 @@ def test_count_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, pytest.param(["Borough", "Complaint Type"], id="NumGroupByKeys:2"), ], ) -def test_min_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, use_new_planner): +def test_min_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): """min across groups""" daft_df = ( daft_df.repartition(repartition_nparts) @@ -188,7 +188,7 @@ def test_min_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, ke pytest.param(["Borough", "Complaint Type"], id="NumGroupByKeys:2"), ], ) -def test_max_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, use_new_planner): +def test_max_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): """max across groups""" daft_df = ( daft_df.repartition(repartition_nparts) @@ -209,7 +209,7 @@ def test_max_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, ke pytest.param(["Borough", "Complaint Type"], id="NumGroupSortKeys:2"), ], ) -def test_sum_groupby_sorted(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, use_new_planner): +def test_sum_groupby_sorted(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): """Test sorting after a groupby""" daft_df = ( daft_df.repartition(repartition_nparts) diff --git a/tests/cookbook/test_count_rows.py b/tests/cookbook/test_count_rows.py index d801fb7f58..e5afb8f0ee 100644 --- a/tests/cookbook/test_count_rows.py +++ b/tests/cookbook/test_count_rows.py @@ -5,13 +5,13 @@ from daft.expressions import col -def test_count_rows(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_count_rows(daft_df, service_requests_csv_pd_df, repartition_nparts): """Count rows for the entire table""" daft_df_row_count = daft_df.repartition(repartition_nparts).count_rows() assert daft_df_row_count == service_requests_csv_pd_df.shape[0] -def test_filtered_count_rows(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_filtered_count_rows(daft_df, service_requests_csv_pd_df, repartition_nparts): """Count rows on a table filtered by a certain condition""" daft_df_row_count = daft_df.repartition(repartition_nparts).where(col("Borough") == "BROOKLYN").count_rows() @@ -26,20 +26,20 @@ def test_filtered_count_rows(daft_df, service_requests_csv_pd_df, repartition_np pytest.param(["Borough", "Complaint Type"], id="NumGroupByKeys:2"), ], ) -def test_groupby_count_rows(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, use_new_planner): +def test_groupby_count_rows(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): """Count rows after group by""" daft_df = daft_df.repartition(repartition_nparts).groupby(*[col(k) for k in keys]).sum(col("Unique Key")) service_requests_csv_pd_df = service_requests_csv_pd_df.groupby(keys).sum("Unique Key").reset_index() assert daft_df.count_rows() == len(service_requests_csv_pd_df) -def test_dataframe_length_after_collect(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_dataframe_length_after_collect(daft_df, service_requests_csv_pd_df, repartition_nparts): """Count rows after group by""" daft_df = daft_df.repartition(repartition_nparts).collect() assert len(daft_df) == len(service_requests_csv_pd_df) -def test_dataframe_length_before_collect(daft_df, use_new_planner): +def test_dataframe_length_before_collect(daft_df): """Count rows for the entire table""" with pytest.raises(RuntimeError) as err_info: len(daft_df) diff --git a/tests/cookbook/test_distinct.py b/tests/cookbook/test_distinct.py index 8ece5a6534..a209ad80f3 100644 --- a/tests/cookbook/test_distinct.py +++ b/tests/cookbook/test_distinct.py @@ -13,7 +13,7 @@ pytest.param(["Borough", "Complaint Type"], id="NumGroupByKeys:2"), ], ) -def test_distinct_all_columns(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, use_new_planner): +def test_distinct_all_columns(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): """Sums across groups""" daft_df = daft_df.repartition(repartition_nparts).select(*[col(k) for k in keys]).distinct() diff --git a/tests/cookbook/test_filter.py b/tests/cookbook/test_filter.py index d8da8544e8..f69361a731 100644 --- a/tests/cookbook/test_filter.py +++ b/tests/cookbook/test_filter.py @@ -30,7 +30,7 @@ ), ], ) -def test_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts): """Filter the dataframe, retrieve the top N results and select a subset of columns""" daft_noise_complaints = daft_df_ops(daft_df.repartition(repartition_nparts)) @@ -83,7 +83,7 @@ def test_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_np ), ], ) -def test_complex_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_complex_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts): """Filter the dataframe with a complex filter and select a subset of columns""" daft_noise_complaints_brooklyn = daft_df_ops(daft_df.repartition(repartition_nparts)) @@ -127,7 +127,7 @@ def test_complex_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repart ), ], ) -def test_chain_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_chain_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts): """Filter the dataframe with a chain of filters and select a subset of columns""" daft_noise_complaints_brooklyn = daft_df_ops(daft_df.repartition(repartition_nparts)) @@ -142,7 +142,7 @@ def test_chain_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartit assert_df_equals(daft_pd_df, pd_noise_complaints_brooklyn) -def test_filter_on_projection(use_new_planner): +def test_filter_on_projection(): """Filter the dataframe with on top of a projection""" df = daft.from_pydict({"x": [1, 1, 1, 1, 1]}) df = df.select(col("x") * 2) diff --git a/tests/cookbook/test_joins.py b/tests/cookbook/test_joins.py index 61a555902f..4ec962e328 100644 --- a/tests/cookbook/test_joins.py +++ b/tests/cookbook/test_joins.py @@ -4,7 +4,7 @@ from tests.conftest import assert_df_equals -def test_simple_join(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_simple_join(daft_df, service_requests_csv_pd_df, repartition_nparts): daft_df = daft_df.repartition(repartition_nparts) daft_df_left = daft_df.select(col("Unique Key"), col("Borough")) daft_df_right = daft_df.select(col("Unique Key"), col("Created Date")) @@ -21,7 +21,7 @@ def test_simple_join(daft_df, service_requests_csv_pd_df, repartition_nparts, us assert_df_equals(daft_pd_df, service_requests_csv_pd_df) -def test_simple_self_join(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_simple_self_join(daft_df, service_requests_csv_pd_df, repartition_nparts): daft_df = daft_df.repartition(repartition_nparts) daft_df = daft_df.select(col("Unique Key"), col("Borough")) @@ -38,7 +38,7 @@ def test_simple_self_join(daft_df, service_requests_csv_pd_df, repartition_npart assert_df_equals(daft_pd_df, service_requests_csv_pd_df) -def test_simple_join_missing_rvalues(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_simple_join_missing_rvalues(daft_df, service_requests_csv_pd_df, repartition_nparts): daft_df_right = daft_df.sort("Unique Key").limit(25).repartition(repartition_nparts) daft_df_left = daft_df.repartition(repartition_nparts) daft_df_left = daft_df_left.select(col("Unique Key"), col("Borough")) @@ -58,7 +58,7 @@ def test_simple_join_missing_rvalues(daft_df, service_requests_csv_pd_df, repart assert_df_equals(daft_pd_df, service_requests_csv_pd_df) -def test_simple_join_missing_lvalues(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_simple_join_missing_lvalues(daft_df, service_requests_csv_pd_df, repartition_nparts): daft_df_right = daft_df.repartition(repartition_nparts) daft_df_left = daft_df.sort(col("Unique Key")).limit(25).repartition(repartition_nparts) daft_df_left = daft_df_left.select(col("Unique Key"), col("Borough")) diff --git a/tests/cookbook/test_sorting.py b/tests/cookbook/test_sorting.py index 7960364ff2..71a098e8d0 100644 --- a/tests/cookbook/test_sorting.py +++ b/tests/cookbook/test_sorting.py @@ -7,7 +7,7 @@ from tests.conftest import assert_df_equals -def test_sorted_by_expr(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): +def test_sorted_by_expr(daft_df, service_requests_csv_pd_df, repartition_nparts): """Sort by a column that undergoes an expression""" daft_df = daft_df.repartition(repartition_nparts) daft_sorted_df = daft_df.sort(((col("Unique Key") % 2) == 0).if_else(col("Unique Key"), col("Unique Key") * -1)) @@ -36,7 +36,7 @@ def test_sorted_by_expr(daft_df, service_requests_csv_pd_df, repartition_nparts, pytest.param(["Borough", "Unique Key"], id="NumSortKeys:2"), ], ) -def test_get_sorted(daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys, use_new_planner): +def test_get_sorted(daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys): """Sort by a column""" daft_df = daft_df.repartition(repartition_nparts) daft_sorted_df = daft_df.sort([col(k) for k in sort_keys], desc=True) @@ -55,7 +55,7 @@ def test_get_sorted(daft_df, service_requests_csv_pd_df, repartition_nparts, sor pytest.param(["Borough", "Unique Key"], id="NumSortKeys:2"), ], ) -def test_get_sorted_top_n(daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys, use_new_planner): +def test_get_sorted_top_n(daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys): """Sort by a column""" daft_df = daft_df.repartition(repartition_nparts) daft_sorted_df = daft_df.sort([col(k) for k in sort_keys], desc=True).limit(100) @@ -74,9 +74,7 @@ def test_get_sorted_top_n(daft_df, service_requests_csv_pd_df, repartition_npart pytest.param(["Borough", "Unique Key"], id="NumSortKeys:2"), ], ) -def test_get_sorted_top_n_flipped_desc( - daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys, use_new_planner -): +def test_get_sorted_top_n_flipped_desc(daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys): """Sort by a column""" daft_df = daft_df.repartition(repartition_nparts) desc_list = [True] @@ -105,9 +103,7 @@ def test_get_sorted_top_n_flipped_desc( ), ], ) -def test_get_sorted_top_n_projected( - daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner -): +def test_get_sorted_top_n_projected(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts): """Sort by a column and retrieve specific columns from the top N results""" daft_df = daft_df.repartition(repartition_nparts) expected = service_requests_csv_pd_df.sort_values(by="Unique Key", ascending=False)[ diff --git a/tests/cookbook/test_write.py b/tests/cookbook/test_write.py index db2aca6c3c..17d3ebad12 100644 --- a/tests/cookbook/test_write.py +++ b/tests/cookbook/test_write.py @@ -7,7 +7,7 @@ from tests.cookbook.assets import COOKBOOK_DATA_CSV -def test_parquet_write(tmp_path, use_new_planner): +def test_parquet_write(tmp_path): df = daft.read_csv(COOKBOOK_DATA_CSV) pd_df = df.write_parquet(tmp_path) @@ -17,7 +17,7 @@ def test_parquet_write(tmp_path, use_new_planner): assert len(pd_df.to_pandas()) == 1 -def test_parquet_write_with_partitioning(tmp_path, use_new_planner): +def test_parquet_write_with_partitioning(tmp_path): df = daft.read_csv(COOKBOOK_DATA_CSV) pd_df = df.write_parquet(tmp_path, partition_cols=["Borough"]) @@ -28,7 +28,7 @@ def test_parquet_write_with_partitioning(tmp_path, use_new_planner): assert len(pd_df.to_pandas()) == 5 -def test_csv_write(tmp_path, use_new_planner): +def test_csv_write(tmp_path): df = daft.read_csv(COOKBOOK_DATA_CSV) pd_df = df.write_csv(tmp_path) @@ -40,7 +40,7 @@ def test_csv_write(tmp_path, use_new_planner): @pytest.mark.skip() -def test_csv_write_with_partitioning(tmp_path, use_new_planner): +def test_csv_write_with_partitioning(tmp_path): df = daft.read_csv(COOKBOOK_DATA_CSV) pd_df = df.write_csv(tmp_path, partition_cols=["Borough"]).to_pandas() diff --git a/tests/dataframe/test_accessors.py b/tests/dataframe/test_accessors.py index 3b7dd9c29d..3299507e7f 100644 --- a/tests/dataframe/test_accessors.py +++ b/tests/dataframe/test_accessors.py @@ -11,14 +11,14 @@ def df(): return daft.from_pydict({"foo": [1, 2, 3]}) -def test_num_partitions(df, use_new_planner): +def test_num_partitions(df): assert df.num_partitions() == 1 df2 = df.repartition(2) assert df2.num_partitions() == 2 -def test_schema(df, use_new_planner): +def test_schema(df): fields = [f for f in df.schema()] assert len(fields) == 1 [field] = fields @@ -26,11 +26,11 @@ def test_schema(df, use_new_planner): assert field.dtype == DataType.int64() -def test_column_names(df, use_new_planner): +def test_column_names(df): assert df.column_names == ["foo"] -def test_columns(df, use_new_planner): +def test_columns(df): assert len(df.columns) == 1 [ex] = df.columns assert ex.name() == "foo" diff --git a/tests/dataframe/test_aggregations.py b/tests/dataframe/test_aggregations.py index 29d33e64c8..1667204fbb 100644 --- a/tests/dataframe/test_aggregations.py +++ b/tests/dataframe/test_aggregations.py @@ -15,7 +15,7 @@ @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_agg_global(repartition_nparts, use_new_planner): +def test_agg_global(repartition_nparts): daft_df = daft.from_pydict( { "id": [1, 2, 3], @@ -46,7 +46,7 @@ def test_agg_global(repartition_nparts, use_new_planner): @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_agg_global_all_null(repartition_nparts, use_new_planner): +def test_agg_global_all_null(repartition_nparts): daft_df = daft.from_pydict( { "id": [0, 1, 2, 3], @@ -82,7 +82,7 @@ def test_agg_global_all_null(repartition_nparts, use_new_planner): assert pa.Table.from_pydict(daft_cols) == pa.Table.from_pydict(expected) -def test_agg_global_empty(use_new_planner): +def test_agg_global_empty(): daft_df = daft.from_pydict( { "id": [0], @@ -119,7 +119,7 @@ def test_agg_global_empty(use_new_planner): @pytest.mark.parametrize("repartition_nparts", [1, 2, 7]) -def test_agg_groupby(repartition_nparts, use_new_planner): +def test_agg_groupby(repartition_nparts): daft_df = daft.from_pydict( { "group": [1, 1, 1, 2, 2, 2], @@ -164,7 +164,7 @@ def test_agg_groupby(repartition_nparts, use_new_planner): @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_agg_groupby_all_null(repartition_nparts, use_new_planner): +def test_agg_groupby_all_null(repartition_nparts): daft_df = daft.from_pydict( { "id": [0, 1, 2, 3, 4], @@ -203,7 +203,7 @@ def test_agg_groupby_all_null(repartition_nparts, use_new_planner): ) -def test_agg_groupby_null_type_column(use_new_planner): +def test_agg_groupby_null_type_column(): daft_df = daft.from_pydict( { "id": [1, 2, 3, 4], @@ -222,7 +222,7 @@ def test_agg_groupby_null_type_column(use_new_planner): @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_null_groupby_keys(repartition_nparts, use_new_planner): +def test_null_groupby_keys(repartition_nparts): daft_df = daft.from_pydict( { "id": [0, 1, 2, 3, 4], @@ -252,7 +252,7 @@ def test_null_groupby_keys(repartition_nparts, use_new_planner): @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_all_null_groupby_keys(repartition_nparts, use_new_planner): +def test_all_null_groupby_keys(repartition_nparts): daft_df = daft.from_pydict( { "id": [0, 1, 2], @@ -281,7 +281,7 @@ def test_all_null_groupby_keys(repartition_nparts, use_new_planner): assert set(daft_cols["list"][0]) == {1, 2, 3} -def test_null_type_column_groupby_keys(use_new_planner): +def test_null_type_column_groupby_keys(): daft_df = daft.from_pydict( { "id": [0, 1, 2], @@ -294,7 +294,7 @@ def test_null_type_column_groupby_keys(use_new_planner): daft_df.groupby(col("group")) -def test_agg_groupby_empty(use_new_planner): +def test_agg_groupby_empty(): daft_df = daft.from_pydict( { "id": [0], @@ -337,7 +337,7 @@ class CustomObject: val: int -def test_agg_pyobjects(use_new_planner): +def test_agg_pyobjects(): objects = [CustomObject(val=0), None, CustomObject(val=1)] df = daft.from_pydict({"objs": objects}) df = df.into_partitions(2) @@ -354,7 +354,7 @@ def test_agg_pyobjects(use_new_planner): assert res["list"] == [objects] -def test_groupby_agg_pyobjects(use_new_planner): +def test_groupby_agg_pyobjects(): objects = [CustomObject(val=0), CustomObject(val=1), None, None, CustomObject(val=2)] df = daft.from_pydict({"objects": objects, "groups": [1, 2, 1, 2, 1]}) df = df.into_partitions(2) diff --git a/tests/dataframe/test_concat.py b/tests/dataframe/test_concat.py index b37b4dc339..34367f5e02 100644 --- a/tests/dataframe/test_concat.py +++ b/tests/dataframe/test_concat.py @@ -5,14 +5,14 @@ import daft -def test_simple_concat(use_new_planner): +def test_simple_concat(): df1 = daft.from_pydict({"foo": [1, 2, 3]}) df2 = daft.from_pydict({"foo": [4, 5, 6]}) result = df1.concat(df2) assert result.to_pydict() == {"foo": [1, 2, 3, 4, 5, 6]} -def test_concat_schema_mismatch(use_new_planner): +def test_concat_schema_mismatch(): df1 = daft.from_pydict({"foo": [1, 2, 3]}) df2 = daft.from_pydict({"foo": ["4", "5", "6"]}) with pytest.raises(ValueError): diff --git a/tests/dataframe/test_creation.py b/tests/dataframe/test_creation.py index a708f1bb49..616745f6bb 100644 --- a/tests/dataframe/test_creation.py +++ b/tests/dataframe/test_creation.py @@ -83,17 +83,17 @@ def test_create_dataframe_empty_list() -> None: ### -def test_create_dataframe_list(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_list(valid_data: list[dict[str, float]]) -> None: df = daft.from_pylist(valid_data) assert set(df.column_names) == set(COL_NAMES) -def test_create_dataframe_list_empty(use_new_planner) -> None: +def test_create_dataframe_list_empty() -> None: df = daft.from_pylist([]) assert df.column_names == [] -def test_create_dataframe_list_ragged_keys(use_new_planner) -> None: +def test_create_dataframe_list_ragged_keys() -> None: df = daft.from_pylist( [ {"foo": 1}, @@ -108,12 +108,12 @@ def test_create_dataframe_list_ragged_keys(use_new_planner) -> None: } -def test_create_dataframe_list_empty_dicts(use_new_planner) -> None: +def test_create_dataframe_list_empty_dicts() -> None: df = daft.from_pylist([{}, {}, {}]) assert df.column_names == [] -def test_create_dataframe_list_non_dicts(use_new_planner) -> None: +def test_create_dataframe_list_non_dicts() -> None: with pytest.raises(ValueError) as e: daft.from_pylist([1, 2, 3]) assert "Expected list of dictionaries of {column_name: value}" in str(e.value) @@ -124,24 +124,24 @@ def test_create_dataframe_list_non_dicts(use_new_planner) -> None: ### -def test_create_dataframe_pydict(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_pydict(valid_data: list[dict[str, float]]) -> None: pydict = {k: [item[k] for item in valid_data] for k in valid_data[0].keys()} df = daft.from_pydict(pydict) assert set(df.column_names) == set(COL_NAMES) -def test_create_dataframe_empty_pydict(use_new_planner) -> None: +def test_create_dataframe_empty_pydict() -> None: df = daft.from_pydict({}) assert df.column_names == [] -def test_create_dataframe_pydict_ragged_col_lens(use_new_planner) -> None: +def test_create_dataframe_pydict_ragged_col_lens() -> None: with pytest.raises(ValueError) as e: daft.from_pydict({"foo": [1, 2], "bar": [1, 2, 3]}) assert "Expected all columns to be of the same length" in str(e.value) -def test_create_dataframe_pydict_bad_columns(use_new_planner) -> None: +def test_create_dataframe_pydict_bad_columns() -> None: with pytest.raises(ValueError) as e: daft.from_pydict({"foo": "somestring"}) assert "Creating a Series from data of type" in str(e.value) @@ -153,7 +153,7 @@ def test_create_dataframe_pydict_bad_columns(use_new_planner) -> None: @pytest.mark.parametrize("multiple", [False, True]) -def test_create_dataframe_arrow(valid_data: list[dict[str, float]], multiple, use_new_planner) -> None: +def test_create_dataframe_arrow(valid_data: list[dict[str, float]], multiple) -> None: t = pa.Table.from_pydict({k: [valid_data[i][k] for i in range(len(valid_data))] for k in valid_data[0].keys()}) if multiple: t = [t, t, t] @@ -167,7 +167,7 @@ def test_create_dataframe_arrow(valid_data: list[dict[str, float]], multiple, us assert df.to_arrow() == expected -def test_create_dataframe_arrow_tensor_ray(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_arrow_tensor_ray(valid_data: list[dict[str, float]]) -> None: pydict = {k: [item[k] for item in valid_data] for k in valid_data[0].keys()} shape = (2, 2) arr = np.ones((len(valid_data),) + shape) @@ -190,7 +190,7 @@ def test_create_dataframe_arrow_tensor_ray(valid_data: list[dict[str, float]], u not pyarrow_supports_fixed_shape_tensor(), reason=f"Arrow version {ARROW_VERSION} doesn't support the canonical tensor extension type.", ) -def test_create_dataframe_arrow_tensor_canonical(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_arrow_tensor_canonical(valid_data: list[dict[str, float]]) -> None: pydict = {k: [item[k] for item in valid_data] for k in valid_data[0].keys()} shape = (2, 2) dtype = pa.fixed_shape_tensor(pa.int64(), shape) @@ -207,9 +207,7 @@ def test_create_dataframe_arrow_tensor_canonical(valid_data: list[dict[str, floa assert df.to_arrow() == expected -def test_create_dataframe_arrow_extension_type( - valid_data: list[dict[str, float]], uuid_ext_type: UuidType, use_new_planner -) -> None: +def test_create_dataframe_arrow_extension_type(valid_data: list[dict[str, float]], uuid_ext_type: UuidType) -> None: pydict = {k: [item[k] for item in valid_data] for k in valid_data[0].keys()} storage = pa.array([f"{i}".encode() for i in range(len(valid_data))]) pydict["obj"] = pa.ExtensionArray.from_storage(uuid_ext_type, storage) @@ -233,7 +231,7 @@ def __reduce__(self): return PyExtType, () -def test_create_dataframe_arrow_py_ext_type_raises(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_arrow_py_ext_type_raises(valid_data: list[dict[str, float]]) -> None: pydict = {k: [item[k] for item in valid_data] for k in valid_data[0].keys()} uuid_type = PyExtType() storage_array = pa.array([f"foo-{i}".encode() for i in range(len(valid_data))], pa.binary()) @@ -244,7 +242,7 @@ def test_create_dataframe_arrow_py_ext_type_raises(valid_data: list[dict[str, fl daft.from_arrow(t) -def test_create_dataframe_arrow_unsupported_dtype(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_arrow_unsupported_dtype(valid_data: list[dict[str, float]]) -> None: pydict = {k: [item[k] for item in valid_data] for k in valid_data[0].keys()} pydict["obj"] = [ decimal.Decimal("12456789012345678901234567890123456789012345678901234567890") for _ in range(len(valid_data)) @@ -266,7 +264,7 @@ def test_create_dataframe_arrow_unsupported_dtype(valid_data: list[dict[str, flo @pytest.mark.parametrize("multiple", [False, True]) -def test_create_dataframe_pandas(valid_data: list[dict[str, float]], multiple, use_new_planner) -> None: +def test_create_dataframe_pandas(valid_data: list[dict[str, float]], multiple) -> None: pd_df = pd.DataFrame(valid_data) if multiple: pd_df = [pd_df, pd_df, pd_df] @@ -278,7 +276,7 @@ def test_create_dataframe_pandas(valid_data: list[dict[str, float]], multiple, u pd.testing.assert_frame_equal(df.to_pandas(), pd_df) -def test_create_dataframe_pandas_py_object(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_pandas_py_object(valid_data: list[dict[str, float]]) -> None: pydict = {k: [item[k] for item in valid_data] for k in valid_data[0].keys()} pydict["obj"] = [MyObjWithValue(i) for i in range(len(valid_data))] pd_df = pd.DataFrame(pydict) @@ -290,7 +288,7 @@ def test_create_dataframe_pandas_py_object(valid_data: list[dict[str, float]], u pd.testing.assert_frame_equal(df.to_pandas(), pd_df) -def test_create_dataframe_pandas_tensor(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_pandas_tensor(valid_data: list[dict[str, float]]) -> None: pydict = {k: [item[k] for item in valid_data] for k in valid_data[0].keys()} shape = (2, 2) pydict["tensor"] = TensorArray(np.ones((len(valid_data),) + shape)) @@ -337,7 +335,7 @@ def test_create_dataframe_pandas_tensor(valid_data: list[dict[str, float]], use_ pytest.param(np.ones((3, 3, 3)), DataType.tensor(DataType.float64()), id="np_nested_nd"), ], ) -def test_load_pydict_types(data, expected_dtype, use_new_planner): +def test_load_pydict_types(data, expected_dtype): data_dict = {"x": data} daft_df = daft.from_pydict(data_dict) daft_df.collect() @@ -352,7 +350,7 @@ def test_load_pydict_types(data, expected_dtype, use_new_planner): ### -def test_create_dataframe_csv(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_csv(valid_data: list[dict[str, float]]) -> None: with tempfile.NamedTemporaryFile("w") as f: header = list(valid_data[0].keys()) writer = csv.writer(f) @@ -368,7 +366,7 @@ def test_create_dataframe_csv(valid_data: list[dict[str, float]], use_new_planne assert len(pd_df) == len(valid_data) -def test_create_dataframe_multiple_csvs(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_multiple_csvs(valid_data: list[dict[str, float]]) -> None: with tempfile.NamedTemporaryFile("w") as f1, tempfile.NamedTemporaryFile("w") as f2: for f in (f1, f2): header = list(valid_data[0].keys()) @@ -385,7 +383,10 @@ def test_create_dataframe_multiple_csvs(valid_data: list[dict[str, float]], use_ assert len(pd_df) == (len(valid_data) * 2) -@pytest.mark.skipif(get_context().runner_config.name not in {"py"}, reason="requires PyRunner to be in use") +@pytest.mark.skipif( + (get_context().runner_config.name not in {"py"}) or get_context().use_rust_planner, + reason="requires PyRunner and old query planner to be in use", +) def test_create_dataframe_csv_custom_fs(valid_data: list[dict[str, float]]) -> None: with tempfile.NamedTemporaryFile("w") as f: header = list(valid_data[0].keys()) @@ -416,7 +417,7 @@ def test_create_dataframe_csv_custom_fs(valid_data: list[dict[str, float]]) -> N assert len(pd_df) == len(valid_data) -def test_create_dataframe_csv_generate_headers(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_csv_generate_headers(valid_data: list[dict[str, float]]) -> None: with tempfile.NamedTemporaryFile("w") as f: header = list(valid_data[0].keys()) writer = csv.writer(f) @@ -432,7 +433,7 @@ def test_create_dataframe_csv_generate_headers(valid_data: list[dict[str, float] assert len(pd_df) == len(valid_data) -def test_create_dataframe_csv_column_projection(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_csv_column_projection(valid_data: list[dict[str, float]]) -> None: with tempfile.NamedTemporaryFile("w") as f: header = list(valid_data[0].keys()) writer = csv.writer(f) @@ -451,7 +452,7 @@ def test_create_dataframe_csv_column_projection(valid_data: list[dict[str, float assert len(pd_df) == len(valid_data) -def test_create_dataframe_csv_custom_delimiter(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_csv_custom_delimiter(valid_data: list[dict[str, float]]) -> None: with tempfile.NamedTemporaryFile("w") as f: header = list(valid_data[0].keys()) writer = csv.writer(f, delimiter="\t") @@ -467,7 +468,7 @@ def test_create_dataframe_csv_custom_delimiter(valid_data: list[dict[str, float] assert len(pd_df) == len(valid_data) -def test_create_dataframe_csv_specify_schema(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_csv_specify_schema(valid_data: list[dict[str, float]]) -> None: with tempfile.NamedTemporaryFile("w") as f: header = list(valid_data[0].keys()) writer = csv.writer(f, delimiter="\t") @@ -493,7 +494,7 @@ def test_create_dataframe_csv_specify_schema(valid_data: list[dict[str, float]], assert len(pd_df) == len(valid_data) -def test_create_dataframe_csv_specify_schema_no_headers(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_csv_specify_schema_no_headers(valid_data: list[dict[str, float]]) -> None: with tempfile.NamedTemporaryFile("w") as f: header = list(valid_data[0].keys()) writer = csv.writer(f, delimiter="\t") @@ -524,7 +525,7 @@ def test_create_dataframe_csv_specify_schema_no_headers(valid_data: list[dict[st ### -def test_create_dataframe_json(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_json(valid_data: list[dict[str, float]]) -> None: with tempfile.NamedTemporaryFile("w") as f: for data in valid_data: f.write(json.dumps(data)) @@ -539,7 +540,7 @@ def test_create_dataframe_json(valid_data: list[dict[str, float]], use_new_plann assert len(pd_df) == len(valid_data) -def test_create_dataframe_multiple_jsons(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_multiple_jsons(valid_data: list[dict[str, float]]) -> None: with tempfile.NamedTemporaryFile("w") as f1, tempfile.NamedTemporaryFile("w") as f2: for f in (f1, f2): for data in valid_data: @@ -555,7 +556,10 @@ def test_create_dataframe_multiple_jsons(valid_data: list[dict[str, float]], use assert len(pd_df) == (len(valid_data) * 2) -@pytest.mark.skipif(get_context().runner_config.name not in {"py"}, reason="requires PyRunner to be in use") +@pytest.mark.skipif( + (get_context().runner_config.name not in {"py"}) or get_context().use_rust_planner, + reason="requires PyRunner and old query planner to be in use", +) def test_create_dataframe_json_custom_fs(valid_data: list[dict[str, float]]) -> None: with tempfile.NamedTemporaryFile("w") as f: for data in valid_data: @@ -586,7 +590,7 @@ def test_create_dataframe_json_custom_fs(valid_data: list[dict[str, float]]) -> assert len(pd_df) == len(valid_data) -def test_create_dataframe_json_column_projection(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_json_column_projection(valid_data: list[dict[str, float]]) -> None: with tempfile.NamedTemporaryFile("w") as f: for data in valid_data: f.write(json.dumps(data)) @@ -604,14 +608,14 @@ def test_create_dataframe_json_column_projection(valid_data: list[dict[str, floa assert len(pd_df) == len(valid_data) -def test_create_dataframe_json_https(use_new_planner) -> None: +def test_create_dataframe_json_https() -> None: df = daft.read_json("https://github.com/Eventual-Inc/mnist-json/raw/master/mnist_handwritten_test.json.gz") df.collect() assert set(df.column_names) == {"label", "image"} assert len(df) == 10000 -def test_create_dataframe_json_specify_schema(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_create_dataframe_json_specify_schema(valid_data: list[dict[str, float]]) -> None: with tempfile.NamedTemporaryFile("w") as f: for data in valid_data: f.write(json.dumps(data)) @@ -641,7 +645,7 @@ def test_create_dataframe_json_specify_schema(valid_data: list[dict[str, float]] @pytest.mark.parametrize("use_native_downloader", [True, False]) -def test_create_dataframe_parquet(valid_data: list[dict[str, float]], use_native_downloader, use_new_planner) -> None: +def test_create_dataframe_parquet(valid_data: list[dict[str, float]], use_native_downloader) -> None: with tempfile.NamedTemporaryFile("w") as f: table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) papq.write_table(table, f.name) @@ -656,9 +660,7 @@ def test_create_dataframe_parquet(valid_data: list[dict[str, float]], use_native @pytest.mark.parametrize("use_native_downloader", [True, False]) -def test_create_dataframe_parquet_with_filter( - valid_data: list[dict[str, float]], use_native_downloader, use_new_planner -) -> None: +def test_create_dataframe_parquet_with_filter(valid_data: list[dict[str, float]], use_native_downloader) -> None: with tempfile.NamedTemporaryFile("w") as f: table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) papq.write_table(table, f.name) @@ -675,9 +677,7 @@ def test_create_dataframe_parquet_with_filter( @pytest.mark.parametrize("use_native_downloader", [True, False]) -def test_create_dataframe_multiple_parquets( - valid_data: list[dict[str, float]], use_native_downloader, use_new_planner -) -> None: +def test_create_dataframe_multiple_parquets(valid_data: list[dict[str, float]], use_native_downloader) -> None: with tempfile.NamedTemporaryFile("w") as f1, tempfile.NamedTemporaryFile("w") as f2: for f in (f1, f2): table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) @@ -692,7 +692,10 @@ def test_create_dataframe_multiple_parquets( assert len(pd_df) == (len(valid_data) * 2) -@pytest.mark.skipif(get_context().runner_config.name not in {"py"}, reason="requires PyRunner to be in use") +@pytest.mark.skipif( + (get_context().runner_config.name not in {"py"}) or get_context().use_rust_planner, + reason="requires PyRunner and old query planner to be in use", +) def test_create_dataframe_parquet_custom_fs(valid_data: list[dict[str, float]]) -> None: with tempfile.NamedTemporaryFile("w") as f: table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) @@ -723,9 +726,7 @@ def test_create_dataframe_parquet_custom_fs(valid_data: list[dict[str, float]]) @pytest.mark.parametrize("use_native_downloader", [True, False]) -def test_create_dataframe_parquet_column_projection( - valid_data: list[dict[str, float]], use_native_downloader, use_new_planner -) -> None: +def test_create_dataframe_parquet_column_projection(valid_data: list[dict[str, float]], use_native_downloader) -> None: with tempfile.NamedTemporaryFile("w") as f: table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) papq.write_table(table, f.name) @@ -743,9 +744,7 @@ def test_create_dataframe_parquet_column_projection( @pytest.mark.parametrize("use_native_downloader", [True, False]) -def test_create_dataframe_parquet_specify_schema( - valid_data: list[dict[str, float]], use_native_downloader, use_new_planner -) -> None: +def test_create_dataframe_parquet_specify_schema(valid_data: list[dict[str, float]], use_native_downloader) -> None: with tempfile.NamedTemporaryFile("w") as f: table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) papq.write_table(table, f.name) diff --git a/tests/dataframe/test_decimals.py b/tests/dataframe/test_decimals.py index df237c1ef1..530146a098 100644 --- a/tests/dataframe/test_decimals.py +++ b/tests/dataframe/test_decimals.py @@ -10,7 +10,7 @@ PYARROW_GE_7_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (7, 0, 0) -def test_decimal_parquet_roundtrip(use_new_planner) -> None: +def test_decimal_parquet_roundtrip() -> None: python_decimals = [decimal.Decimal("-2.010"), decimal.Decimal("0.000"), decimal.Decimal("2.010")] data = { "decimal128": pa.array(python_decimals), @@ -27,7 +27,7 @@ def test_decimal_parquet_roundtrip(use_new_planner) -> None: assert str(df.to_pydict()["decimal128"]) == str(df_readback.to_pydict()["decimal128"]) -def test_arrow_decimal(use_new_planner) -> None: +def test_arrow_decimal() -> None: # Test roundtrip of Arrow decimals. pa_table = pa.Table.from_pydict( {"decimal128": pa.array([decimal.Decimal("-1.010"), decimal.Decimal("0.000"), decimal.Decimal("1.010")])} @@ -38,7 +38,7 @@ def test_arrow_decimal(use_new_planner) -> None: assert df.to_arrow() == pa_table -def test_python_decimal(use_new_planner) -> None: +def test_python_decimal() -> None: # Test roundtrip of Python decimals. python_decimals = [decimal.Decimal("-1.010"), decimal.Decimal("0.000"), decimal.Decimal("1.010")] df = daft.from_pydict({"decimal128": python_decimals}) diff --git a/tests/dataframe/test_distinct.py b/tests/dataframe/test_distinct.py index de8e30e19a..d3d2b6adc5 100644 --- a/tests/dataframe/test_distinct.py +++ b/tests/dataframe/test_distinct.py @@ -9,7 +9,7 @@ @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_distinct_with_nulls(repartition_nparts, use_new_planner): +def test_distinct_with_nulls(repartition_nparts): daft_df = daft.from_pydict( { "id": [1, None, None, None], @@ -28,7 +28,7 @@ def test_distinct_with_nulls(repartition_nparts, use_new_planner): @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_distinct_with_all_nulls(repartition_nparts, use_new_planner): +def test_distinct_with_all_nulls(repartition_nparts): daft_df = daft.from_pydict( { "id": [None, None, None, None], @@ -47,7 +47,7 @@ def test_distinct_with_all_nulls(repartition_nparts, use_new_planner): @pytest.mark.parametrize("repartition_nparts", [1, 2]) -def test_distinct_with_empty(repartition_nparts, use_new_planner): +def test_distinct_with_empty(repartition_nparts): daft_df = daft.from_pydict( { "id": [1], diff --git a/tests/dataframe/test_explode.py b/tests/dataframe/test_explode.py index 83b58838ae..22190bc2b7 100644 --- a/tests/dataframe/test_explode.py +++ b/tests/dataframe/test_explode.py @@ -15,7 +15,7 @@ Series.from_arrow(pa.array([[1, 2], [3, 4], None, []], type=pa.large_list(pa.int64()))), ], ) -def test_explode(data, use_new_planner): +def test_explode(data): df = daft.from_pydict({"nested": data, "sidecar": ["a", "b", "c", "d"]}) df = df.explode(col("nested")) assert df.to_pydict() == {"nested": [1, 2, 3, 4, None, None], "sidecar": ["a", "a", "b", "b", "c", "d"]} @@ -28,7 +28,7 @@ def test_explode(data, use_new_planner): Series.from_arrow(pa.array([[1, 2], [3, 4], None, []], type=pa.large_list(pa.int64()))), ], ) -def test_explode_multiple_cols(data, use_new_planner): +def test_explode_multiple_cols(data): df = daft.from_pydict({"nested": data, "nested2": data, "sidecar": ["a", "b", "c", "d"]}) df = df.explode(col("nested"), col("nested2")) assert df.to_pydict() == { @@ -38,7 +38,7 @@ def test_explode_multiple_cols(data, use_new_planner): } -def test_explode_bad_col_type(use_new_planner): +def test_explode_bad_col_type(): df = daft.from_pydict({"a": [1, 2, 3]}) with pytest.raises(ValueError, match="Datatype cannot be exploded:"): df = df.explode(col("a")) diff --git a/tests/dataframe/test_filter.py b/tests/dataframe/test_filter.py index f4a6878ec7..908da4b46c 100644 --- a/tests/dataframe/test_filter.py +++ b/tests/dataframe/test_filter.py @@ -8,14 +8,14 @@ from daft import DataFrame -def test_filter_missing_column(valid_data: list[dict[str, Any]], use_new_planner) -> None: +def test_filter_missing_column(valid_data: list[dict[str, Any]]) -> None: df = daft.from_pylist(valid_data) with pytest.raises(ValueError): df.select("sepal_width").where(df["petal_length"] > 4.8) @pytest.mark.skip(reason="Requires Expression.float.is_nan()") -def test_drop_na(missing_value_data: list[dict[str, Any]], use_new_planner) -> None: +def test_drop_na(missing_value_data: list[dict[str, Any]]) -> None: df: DataFrame = daft.from_pylist(missing_value_data) df_len_no_col = len(df.drop_nan().collect()) assert df_len_no_col == 2 @@ -25,7 +25,7 @@ def test_drop_na(missing_value_data: list[dict[str, Any]], use_new_planner) -> N assert df_len_col == 2 -def test_drop_null(missing_value_data: list[dict[str, Any]], use_new_planner) -> None: +def test_drop_null(missing_value_data: list[dict[str, Any]]) -> None: df: DataFrame = daft.from_pylist(missing_value_data) df_len_no_col = len(df.drop_null().collect()) assert df_len_no_col == 2 diff --git a/tests/dataframe/test_getitem.py b/tests/dataframe/test_getitem.py index b4a3afe810..e4de2cca29 100644 --- a/tests/dataframe/test_getitem.py +++ b/tests/dataframe/test_getitem.py @@ -6,7 +6,7 @@ from daft import DataFrame -def test_dataframe_getitem_single(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_dataframe_getitem_single(valid_data: list[dict[str, float]]) -> None: df = daft.from_pylist(valid_data) expanded_df = df.with_column("foo", df["sepal_length"] + df["sepal_width"]) # TODO(jay): Test that the expression with name "foo" is equal to the expected expression, except for the IDs of the columns @@ -16,7 +16,7 @@ def test_dataframe_getitem_single(valid_data: list[dict[str, float]], use_new_pl assert df.select(df["sepal_length"]).column_names == ["sepal_length"] -def test_dataframe_getitem_single_bad(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_dataframe_getitem_single_bad(valid_data: list[dict[str, float]]) -> None: df = daft.from_pylist(valid_data) with pytest.raises(ValueError, match="not found"): df["foo"] @@ -28,7 +28,7 @@ def test_dataframe_getitem_single_bad(valid_data: list[dict[str, float]], use_ne df[100] -def test_dataframe_getitem_multiple_bad(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_dataframe_getitem_multiple_bad(valid_data: list[dict[str, float]]) -> None: df = daft.from_pylist(valid_data) with pytest.raises(ValueError, match="not found"): df["foo", "bar"] @@ -49,7 +49,7 @@ class A: df[A()] -def test_dataframe_getitem_multiple(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_dataframe_getitem_multiple(valid_data: list[dict[str, float]]) -> None: df = daft.from_pylist(valid_data) expanded_df = df.with_column("foo", sum(df["sepal_length", "sepal_width"].columns)) # TODO(jay): Test that the expression with name "foo" is equal to the expected expression, except for the IDs of the columns @@ -58,13 +58,13 @@ def test_dataframe_getitem_multiple(valid_data: list[dict[str, float]], use_new_ assert df["sepal_length", "sepal_width"].column_names == ["sepal_length", "sepal_width"] -def test_dataframe_getitem_slice(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_dataframe_getitem_slice(valid_data: list[dict[str, float]]) -> None: df = daft.from_pylist(valid_data) slice_df = df[:] assert df.column_names == slice_df.column_names -def test_dataframe_getitem_slice_rev(valid_data: list[dict[str, float]], use_new_planner) -> None: +def test_dataframe_getitem_slice_rev(valid_data: list[dict[str, float]]) -> None: df = daft.from_pylist(valid_data) slice_df = df[::-1] assert df.column_names == slice_df.column_names[::-1] diff --git a/tests/dataframe/test_iter.py b/tests/dataframe/test_iter.py index d886142191..74fc52cc30 100644 --- a/tests/dataframe/test_iter.py +++ b/tests/dataframe/test_iter.py @@ -10,7 +10,7 @@ class MockException(Exception): @pytest.mark.parametrize("materialized", [False, True]) -def test_iter_rows(materialized, use_new_planner): +def test_iter_rows(materialized): # Test that df.__iter__ produces the correct rows in the correct order. # It should work regardless of whether the dataframe has already been materialized or not. @@ -23,7 +23,7 @@ def test_iter_rows(materialized, use_new_planner): @pytest.mark.parametrize("materialized", [False, True]) -def test_iter_partitions(materialized, use_new_planner): +def test_iter_partitions(materialized): # Test that df.iter_partitions() produces partitions in the correct order. # It should work regardless of whether the dataframe has already been materialized or not. @@ -48,7 +48,7 @@ def test_iter_partitions(materialized, use_new_planner): ] -def test_iter_exception(use_new_planner): +def test_iter_exception(): # Test that df.__iter__ actually returns results before completing execution. # We test this by raising an exception in a UDF if too many partitions are executed. @@ -70,7 +70,7 @@ def echo_or_trigger(s): list(it) -def test_iter_partitions_exception(use_new_planner): +def test_iter_partitions_exception(): # Test that df.iter_partitions actually returns results before completing execution. # We test this by raising an exception in a UDF if too many partitions are executed. diff --git a/tests/dataframe/test_joins.py b/tests/dataframe/test_joins.py index 933b737c94..55797792ea 100644 --- a/tests/dataframe/test_joins.py +++ b/tests/dataframe/test_joins.py @@ -10,7 +10,7 @@ @pytest.mark.parametrize("n_partitions", [1, 2, 4]) -def test_multicol_joins(n_partitions: int, use_new_planner): +def test_multicol_joins(n_partitions: int): df = daft.from_pydict( { "A": [1, 2, 3], @@ -31,7 +31,7 @@ def test_multicol_joins(n_partitions: int, use_new_planner): @pytest.mark.parametrize("n_partitions", [1, 2, 4]) -def test_limit_after_join(n_partitions: int, use_new_planner): +def test_limit_after_join(n_partitions: int): data = { "A": [1, 2, 3], } @@ -50,7 +50,7 @@ def test_limit_after_join(n_partitions: int, use_new_planner): @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_inner_join(repartition_nparts, use_new_planner): +def test_inner_join(repartition_nparts): daft_df = daft.from_pydict( { "id": [1, None, 3], @@ -76,7 +76,7 @@ def test_inner_join(repartition_nparts, use_new_planner): @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_inner_join_multikey(repartition_nparts, use_new_planner): +def test_inner_join_multikey(repartition_nparts): daft_df = daft.from_pydict( { "id": [1, None, None], @@ -105,7 +105,7 @@ def test_inner_join_multikey(repartition_nparts, use_new_planner): @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_inner_join_all_null(repartition_nparts, use_new_planner): +def test_inner_join_all_null(repartition_nparts): daft_df = daft.from_pydict( { "id": [None, None, None], @@ -130,7 +130,7 @@ def test_inner_join_all_null(repartition_nparts, use_new_planner): ) -def test_inner_join_null_type_column(use_new_planner): +def test_inner_join_null_type_column(): daft_df = daft.from_pydict( { "id": [None, None, None], diff --git a/tests/dataframe/test_logical_type.py b/tests/dataframe/test_logical_type.py index 9558e0d006..342824a185 100644 --- a/tests/dataframe/test_logical_type.py +++ b/tests/dataframe/test_logical_type.py @@ -14,7 +14,7 @@ ARROW_VERSION = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) -def test_embedding_type_df(use_new_planner) -> None: +def test_embedding_type_df() -> None: data = [[1, 2, 3], np.arange(3), ["1", "2", "3"], [1, "2", 3.0], pd.Series([1.1, 2, 3]), (1, 2, 3), None] df = daft.from_pydict({"index": np.arange(len(data)), "embeddings": Series.from_pylist(data, pyobj="force")}) @@ -28,7 +28,7 @@ def test_embedding_type_df(use_new_planner) -> None: @pytest.mark.parametrize("from_pil_imgs", [True, False]) -def test_image_type_df(from_pil_imgs, use_new_planner) -> None: +def test_image_type_df(from_pil_imgs) -> None: data = [ np.arange(12, dtype=np.uint8).reshape((2, 2, 3)), np.arange(12, 39, dtype=np.uint8).reshape((3, 3, 3)), @@ -50,7 +50,7 @@ def test_image_type_df(from_pil_imgs, use_new_planner) -> None: assert isinstance(arrow_table["image"].type, DaftExtension) -def test_fixed_shape_image_type_df(use_new_planner) -> None: +def test_fixed_shape_image_type_df() -> None: height = 2 width = 2 shape = (height, width, 3) @@ -66,7 +66,7 @@ def test_fixed_shape_image_type_df(use_new_planner) -> None: assert isinstance(arrow_table["image"].type, DaftExtension) -def test_tensor_type_df(use_new_planner) -> None: +def test_tensor_type_df() -> None: data = [ np.arange(12).reshape((3, 2, 2)), np.arange(12, 39).reshape((3, 3, 3)), @@ -82,7 +82,7 @@ def test_tensor_type_df(use_new_planner) -> None: assert isinstance(arrow_table["tensor"].type, DaftExtension) -def test_fixed_shape_tensor_type_df(use_new_planner) -> None: +def test_fixed_shape_tensor_type_df() -> None: shape = (3, 2, 2) data = [ np.arange(12).reshape(shape), diff --git a/tests/dataframe/test_repartition.py b/tests/dataframe/test_repartition.py index 84afd98304..92c96e5721 100644 --- a/tests/dataframe/test_repartition.py +++ b/tests/dataframe/test_repartition.py @@ -3,7 +3,7 @@ import daft -def test_into_partitions_some_empty(use_new_planner) -> None: +def test_into_partitions_some_empty() -> None: data = {"foo": [1, 2, 3]} df = daft.from_pydict(data).into_partitions(32).collect() assert df.to_pydict() == data diff --git a/tests/dataframe/test_repr.py b/tests/dataframe/test_repr.py index 16e08bb887..92e16fccf2 100644 --- a/tests/dataframe/test_repr.py +++ b/tests/dataframe/test_repr.py @@ -71,7 +71,7 @@ def parse_html_table( return result -def test_empty_repr(use_new_planner): +def test_empty_repr(): df = daft.from_pydict({}) assert df.__repr__() == "(No data to display: Dataframe has no columns)" assert df._repr_html_() == "(No data to display: Dataframe has no columns)" @@ -81,7 +81,7 @@ def test_empty_repr(use_new_planner): assert df._repr_html_() == "(No data to display: Dataframe has no columns)" -def test_empty_df_repr(use_new_planner): +def test_empty_df_repr(): df = daft.from_pydict({"A": [1, 2, 3], "B": ["a", "b", "c"]}) df = df.where(df["A"] > 10) expected_data = {"A": ("Int64", []), "B": ("Utf8", [])} @@ -122,7 +122,7 @@ def test_empty_df_repr(use_new_planner): ) -def test_alias_repr(use_new_planner): +def test_alias_repr(): df = daft.from_pydict({"A": [1, 2, 3], "B": ["a", "b", "c"]}) df = df.select(df["A"].alias("A2"), df["B"]) @@ -170,7 +170,7 @@ def test_alias_repr(use_new_planner): ) -def test_repr_with_html_string(use_new_planner): +def test_repr_with_html_string(): df = daft.from_pydict({"A": [f"