From 76666694058747a12daafd73db6b52fedc5cf09c Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Wed, 18 Sep 2024 17:01:30 -0700 Subject: [PATCH] [CHORE] Add native executor to CI (#2855) Enables native executor in CI (just unit tests for now). Any tests that fail because the native executor doesn't support it yet is marked as skip. The skips will be incrementally removed as functionality gets added. --------- Co-authored-by: Colin Ho --- .github/workflows/python-package.yml | 4 ++ tests/cookbook/test_aggregations.py | 6 +++ tests/cookbook/test_joins.py | 6 +++ tests/cookbook/test_pandas_cookbook.py | 5 +++ tests/cookbook/test_write.py | 5 +++ tests/dataframe/test_aggregations.py | 7 ++- tests/dataframe/test_approx_count_distinct.py | 7 ++- .../test_approx_percentiles_aggregations.py | 7 ++- tests/dataframe/test_concat.py | 7 +++ tests/dataframe/test_creation.py | 6 +++ tests/dataframe/test_decimals.py | 6 +++ tests/dataframe/test_distinct.py | 6 +++ tests/dataframe/test_explode.py | 6 +++ tests/dataframe/test_iter.py | 6 +++ tests/dataframe/test_joins.py | 7 ++- tests/dataframe/test_map_groups.py | 6 +++ .../test_monotonically_increasing_id.py | 6 +++ tests/dataframe/test_pivot.py | 7 +++ tests/dataframe/test_repr.py | 6 +++ tests/dataframe/test_sample.py | 7 +++ tests/dataframe/test_sort.py | 5 +++ tests/dataframe/test_temporals.py | 7 ++- tests/dataframe/test_transform.py | 6 +++ tests/dataframe/test_unpivot.py | 7 ++- tests/dataframe/test_wildcard.py | 7 ++- tests/integration/iceberg/conftest.py | 1 - tests/io/delta_lake/conftest.py | 43 +++++++++++++++---- tests/io/delta_lake/test_table_write.py | 18 ++++++-- tests/io/iceberg/test_iceberg_writes.py | 10 ++++- tests/io/lancedb/test_lancedb_reads.py | 9 +++- tests/io/lancedb/test_lancedb_writes.py | 9 +++- tests/io/test_csv_roundtrip.py | 6 ++- tests/io/test_parquet.py | 5 +++ tests/io/test_parquet_roundtrip.py | 7 +++ tests/io/test_s3_credentials_refresh.py | 6 +++ tests/test_resource_requests.py | 7 ++- 36 files changed, 252 insertions(+), 24 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index eaf02408e6..57062f994e 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -27,8 +27,11 @@ jobs: python-version: ['3.8', '3.10'] daft-runner: [py, ray] pyarrow-version: [7.0.0, 16.0.0] + enable-native-executor: [0, 1] os: [ubuntu-20.04, windows-latest] exclude: + - daft-runner: ray + enable-native-executor: 1 - daft-runner: ray pyarrow-version: 7.0.0 os: ubuntu-20.04 @@ -115,6 +118,7 @@ jobs: CARGO_TARGET_DIR: ./target DAFT_RUNNER: ${{ matrix.daft-runner }} + DAFT_ENABLE_NATIVE_EXECUTOR: ${{ matrix.enable-native-executor }} - name: Build library and Test with pytest (Windows) if: ${{ (runner.os == 'Windows') }} diff --git a/tests/cookbook/test_aggregations.py b/tests/cookbook/test_aggregations.py index 9eb2783da6..2112a4a41f 100644 --- a/tests/cookbook/test_aggregations.py +++ b/tests/cookbook/test_aggregations.py @@ -6,11 +6,17 @@ import pandas as pd import pytest +from daft import context from daft.datatype import DataType from daft.expressions import col from daft.udf import udf from tests.conftest import assert_df_equals +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + def test_sum(daft_df, service_requests_csv_pd_df, repartition_nparts): """Sums across an entire column for the entire table""" diff --git a/tests/cookbook/test_joins.py b/tests/cookbook/test_joins.py index cbc4743d0b..b51c863100 100644 --- a/tests/cookbook/test_joins.py +++ b/tests/cookbook/test_joins.py @@ -2,9 +2,15 @@ import pytest +from daft import context from daft.expressions import col from tests.conftest import assert_df_equals +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + @pytest.mark.parametrize( "join_strategy", [None, "hash", "sort_merge", "sort_merge_aligned_boundaries", "broadcast"], indirect=True diff --git a/tests/cookbook/test_pandas_cookbook.py b/tests/cookbook/test_pandas_cookbook.py index c852568f3f..56838f7490 100644 --- a/tests/cookbook/test_pandas_cookbook.py +++ b/tests/cookbook/test_pandas_cookbook.py @@ -7,10 +7,15 @@ import pytest import daft +from daft import context from daft.datatype import DataType from daft.expressions import col, lit from tests.conftest import assert_df_equals +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) ### # Idioms: if-then ### diff --git a/tests/cookbook/test_write.py b/tests/cookbook/test_write.py index c197ed922d..c209af7df0 100644 --- a/tests/cookbook/test_write.py +++ b/tests/cookbook/test_write.py @@ -7,9 +7,14 @@ from pyarrow import dataset as pads import daft +from daft import context from tests.conftest import assert_df_equals from tests.cookbook.assets import COOKBOOK_DATA_CSV +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) PYARROW_GE_7_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (7, 0, 0) diff --git a/tests/dataframe/test_aggregations.py b/tests/dataframe/test_aggregations.py index 74fe889ce0..9fe567aaef 100644 --- a/tests/dataframe/test_aggregations.py +++ b/tests/dataframe/test_aggregations.py @@ -7,13 +7,18 @@ import pytest import daft -from daft import col +from daft import col, context from daft.context import get_context from daft.datatype import DataType from daft.errors import ExpressionTypeError from daft.utils import freeze from tests.utils import sort_arrow_table +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) def test_agg_global(make_df, repartition_nparts): diff --git a/tests/dataframe/test_approx_count_distinct.py b/tests/dataframe/test_approx_count_distinct.py index 68d7057ca0..78d2a7b181 100644 --- a/tests/dataframe/test_approx_count_distinct.py +++ b/tests/dataframe/test_approx_count_distinct.py @@ -2,7 +2,12 @@ import pytest import daft -from daft import col +from daft import col, context + +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) TESTS = [ [[], 0], diff --git a/tests/dataframe/test_approx_percentiles_aggregations.py b/tests/dataframe/test_approx_percentiles_aggregations.py index d64f1a2381..3e6b3ecd87 100644 --- a/tests/dataframe/test_approx_percentiles_aggregations.py +++ b/tests/dataframe/test_approx_percentiles_aggregations.py @@ -4,7 +4,12 @@ import pyarrow as pa import pytest -from daft import col +from daft import col, context + +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) diff --git a/tests/dataframe/test_concat.py b/tests/dataframe/test_concat.py index f3caf56bb1..07e06df59c 100644 --- a/tests/dataframe/test_concat.py +++ b/tests/dataframe/test_concat.py @@ -2,6 +2,13 @@ import pytest +from daft import context + +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + def test_simple_concat(make_df): df1 = make_df({"foo": [1, 2, 3]}) diff --git a/tests/dataframe/test_creation.py b/tests/dataframe/test_creation.py index d8f76feff2..c43751f7d3 100644 --- a/tests/dataframe/test_creation.py +++ b/tests/dataframe/test_creation.py @@ -15,12 +15,18 @@ import pytest import daft +from daft import context from daft.api_annotations import APITypeError from daft.dataframe import DataFrame from daft.datatype import DataType from daft.utils import pyarrow_supports_fixed_shape_tensor from tests.conftest import UuidType +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + ARROW_VERSION = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) diff --git a/tests/dataframe/test_decimals.py b/tests/dataframe/test_decimals.py index 3a2d11babe..daafec29f0 100644 --- a/tests/dataframe/test_decimals.py +++ b/tests/dataframe/test_decimals.py @@ -7,6 +7,12 @@ import pytest import daft +from daft import context + +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) PYARROW_GE_7_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (7, 0, 0) diff --git a/tests/dataframe/test_distinct.py b/tests/dataframe/test_distinct.py index 8e4b2c0a85..8865a7890c 100644 --- a/tests/dataframe/test_distinct.py +++ b/tests/dataframe/test_distinct.py @@ -3,9 +3,15 @@ import pyarrow as pa import pytest +from daft import context from daft.datatype import DataType from tests.utils import sort_arrow_table +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) def test_distinct_with_nulls(make_df, repartition_nparts): diff --git a/tests/dataframe/test_explode.py b/tests/dataframe/test_explode.py index 26416f9938..0e8dbd73d2 100644 --- a/tests/dataframe/test_explode.py +++ b/tests/dataframe/test_explode.py @@ -3,8 +3,14 @@ import pyarrow as pa import pytest +from daft import context from daft.expressions import col +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + @pytest.mark.parametrize( "data", diff --git a/tests/dataframe/test_iter.py b/tests/dataframe/test_iter.py index 7be026e2f7..8658e5da30 100644 --- a/tests/dataframe/test_iter.py +++ b/tests/dataframe/test_iter.py @@ -3,6 +3,12 @@ import pytest import daft +from daft import context + +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) class MockException(Exception): diff --git a/tests/dataframe/test_joins.py b/tests/dataframe/test_joins.py index 18ba1b8ab3..93292aabe1 100644 --- a/tests/dataframe/test_joins.py +++ b/tests/dataframe/test_joins.py @@ -3,11 +3,16 @@ import pyarrow as pa import pytest -from daft import col +from daft import col, context from daft.datatype import DataType from daft.errors import ExpressionTypeError from tests.utils import sort_arrow_table +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + def skip_invalid_join_strategies(join_strategy, join_type): if (join_strategy == "sort_merge" or join_strategy == "sort_merge_aligned_boundaries") and join_type != "inner": diff --git a/tests/dataframe/test_map_groups.py b/tests/dataframe/test_map_groups.py index 4f0f2e29ec..379e4b4bdc 100644 --- a/tests/dataframe/test_map_groups.py +++ b/tests/dataframe/test_map_groups.py @@ -3,6 +3,12 @@ import pytest import daft +from daft import context + +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) diff --git a/tests/dataframe/test_monotonically_increasing_id.py b/tests/dataframe/test_monotonically_increasing_id.py index 895463da4a..fa57e89216 100644 --- a/tests/dataframe/test_monotonically_increasing_id.py +++ b/tests/dataframe/test_monotonically_increasing_id.py @@ -2,8 +2,14 @@ import pytest +from daft import context from daft.datatype import DataType +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + def test_monotonically_increasing_id_single_partition(make_df) -> None: data = {"a": [1, 2, 3, 4, 5]} diff --git a/tests/dataframe/test_pivot.py b/tests/dataframe/test_pivot.py index fcd88c9c51..232d8b0b45 100644 --- a/tests/dataframe/test_pivot.py +++ b/tests/dataframe/test_pivot.py @@ -1,5 +1,12 @@ import pytest +from daft import context + +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) def test_pivot(make_df, repartition_nparts): diff --git a/tests/dataframe/test_repr.py b/tests/dataframe/test_repr.py index 8e04421901..d72082ca3b 100644 --- a/tests/dataframe/test_repr.py +++ b/tests/dataframe/test_repr.py @@ -8,8 +8,14 @@ from PIL import Image import daft +from daft import context from tests.utils import ANSI_ESCAPE, TD_STYLE, TH_STYLE +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + ROW_DIVIDER_REGEX = re.compile(r"╭─+┬*─*╮|├╌+┼*╌+┤") SHOWING_N_ROWS_REGEX = re.compile(r".*\(Showing first (\d+) of (\d+) rows\).*") UNMATERIALIZED_REGEX = re.compile(r".*\(No data to display: Dataframe not materialized\).*") diff --git a/tests/dataframe/test_sample.py b/tests/dataframe/test_sample.py index f59b6a4172..0e6ce9673a 100644 --- a/tests/dataframe/test_sample.py +++ b/tests/dataframe/test_sample.py @@ -2,6 +2,13 @@ import pytest +from daft import context + +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + def test_sample_fraction(make_df, valid_data: list[dict[str, float]]) -> None: df = make_df(valid_data) diff --git a/tests/dataframe/test_sort.py b/tests/dataframe/test_sort.py index 8a831a2bcf..e972c13831 100644 --- a/tests/dataframe/test_sort.py +++ b/tests/dataframe/test_sort.py @@ -5,9 +5,14 @@ import pyarrow as pa import pytest +from daft import context from daft.datatype import DataType from daft.errors import ExpressionTypeError +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) ### # Validation tests ### diff --git a/tests/dataframe/test_temporals.py b/tests/dataframe/test_temporals.py index aff8f540b5..8843028b01 100644 --- a/tests/dataframe/test_temporals.py +++ b/tests/dataframe/test_temporals.py @@ -9,7 +9,12 @@ import pytz import daft -from daft import DataType, col +from daft import DataType, col, context + +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) PYARROW_GE_7_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (7, 0, 0) diff --git a/tests/dataframe/test_transform.py b/tests/dataframe/test_transform.py index a698b6e7fd..277c378bad 100644 --- a/tests/dataframe/test_transform.py +++ b/tests/dataframe/test_transform.py @@ -3,6 +3,12 @@ import pytest import daft +from daft import context + +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) def add_1(df): diff --git a/tests/dataframe/test_unpivot.py b/tests/dataframe/test_unpivot.py index b4c7a84cc5..e40edb0008 100644 --- a/tests/dataframe/test_unpivot.py +++ b/tests/dataframe/test_unpivot.py @@ -1,8 +1,13 @@ import pytest -from daft import col +from daft import col, context from daft.datatype import DataType +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + @pytest.mark.parametrize("n_partitions", [1, 2, 4]) def test_unpivot(make_df, n_partitions): diff --git a/tests/dataframe/test_wildcard.py b/tests/dataframe/test_wildcard.py index e732292c53..3497912be5 100644 --- a/tests/dataframe/test_wildcard.py +++ b/tests/dataframe/test_wildcard.py @@ -1,9 +1,14 @@ import pytest import daft -from daft import col +from daft import col, context from daft.exceptions import DaftCoreException +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + def test_wildcard_select(): df = daft.from_pydict( diff --git a/tests/integration/iceberg/conftest.py b/tests/integration/iceberg/conftest.py index 0ac80c7ccc..79a59e8ade 100644 --- a/tests/integration/iceberg/conftest.py +++ b/tests/integration/iceberg/conftest.py @@ -10,7 +10,6 @@ PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < (8, 0, 0) pytestmark = pytest.mark.skipif(PYARROW_LE_8_0_0, reason="iceberg writes only supported if pyarrow >= 8.0.0") - import tenacity from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.table import Table diff --git a/tests/io/delta_lake/conftest.py b/tests/io/delta_lake/conftest.py index fdd3d7ae4f..052d5a6d74 100644 --- a/tests/io/delta_lake/conftest.py +++ b/tests/io/delta_lake/conftest.py @@ -35,9 +35,15 @@ def num_partitions(request) -> int: pytest.param((lambda i: i * 1.5, "b"), id="float_partitioned"), pytest.param((lambda i: f"foo_{i}", "c"), id="string_partitioned"), pytest.param((lambda i: f"foo_{i}".encode(), "d"), id="string_partitioned"), - pytest.param((lambda i: datetime.datetime(2024, 2, i + 1), "f"), id="timestamp_partitioned"), + pytest.param( + (lambda i: datetime.datetime(2024, 2, i + 1), "f"), + id="timestamp_partitioned", + ), pytest.param((lambda i: datetime.date(2024, 2, i + 1), "g"), id="date_partitioned"), - pytest.param((lambda i: decimal.Decimal(str(1000 + i) + ".567"), "h"), id="decimal_partitioned"), + pytest.param( + (lambda i: decimal.Decimal(str(1000 + i) + ".567"), "h"), + id="decimal_partitioned", + ), pytest.param((lambda i: i if i % 2 == 0 else None, "a"), id="partitioned_with_nulls"), ] ) @@ -54,10 +60,22 @@ def base_table() -> pa.Table: "c": ["foo", "bar", "baz"], "d": [b"foo", b"bar", b"baz"], "e": [True, False, True], - "f": [datetime.datetime(2024, 2, 10), datetime.datetime(2024, 2, 11), datetime.datetime(2024, 2, 12)], - "g": [datetime.date(2024, 2, 10), datetime.date(2024, 2, 11), datetime.date(2024, 2, 12)], + "f": [ + datetime.datetime(2024, 2, 10), + datetime.datetime(2024, 2, 11), + datetime.datetime(2024, 2, 12), + ], + "g": [ + datetime.date(2024, 2, 10), + datetime.date(2024, 2, 11), + datetime.date(2024, 2, 12), + ], "h": pa.array( - [decimal.Decimal("1234.567"), decimal.Decimal("1233.456"), decimal.Decimal("1232.345")], + [ + decimal.Decimal("1234.567"), + decimal.Decimal("1233.456"), + decimal.Decimal("1232.345"), + ], type=pa.decimal128(7, 3), ), "i": [[1, 2, 3], [4, 5, 6], [7, 8, 9]], @@ -282,7 +300,11 @@ def s3_uri(tmp_path: pathlib.Path, data_dir: str) -> str: ], ) def s3_path( - request, s3_uri: str, aws_server: str, aws_credentials: dict[str, str], reset_aws: None + request, + s3_uri: str, + aws_server: str, + aws_credentials: dict[str, str], + reset_aws: None, ) -> tuple[str, daft.io.IOConfig, DataCatalogTable | None]: s3 = boto3.resource( "s3", @@ -417,13 +439,18 @@ def local_path(tmp_path: pathlib.Path, data_dir: str) -> tuple[str, None, None]: pytest.param(lazy_fixture("az_path"), marks=(pytest.mark.az, pytest.mark.integration)), ], ) -def cloud_paths(request) -> tuple[str, daft.io.IOConfig | None, DataCatalogTable | None]: +def cloud_paths( + request, +) -> tuple[str, daft.io.IOConfig | None, DataCatalogTable | None]: return request.param @pytest.fixture(scope="function") def deltalake_table( - cloud_paths, base_table: pa.Table, num_partitions: int, partition_generator: callable + cloud_paths, + base_table: pa.Table, + num_partitions: int, + partition_generator: callable, ) -> tuple[str, daft.io.IOConfig | None, dict[str, str], list[pa.Table]]: partition_generator, col = partition_generator path, io_config, catalog_table = cloud_paths diff --git a/tests/io/delta_lake/test_table_write.py b/tests/io/delta_lake/test_table_write.py index 63f9bd1881..6dbcf539fa 100644 --- a/tests/io/delta_lake/test_table_write.py +++ b/tests/io/delta_lake/test_table_write.py @@ -6,14 +6,26 @@ import pytest import daft +from daft import context from daft.io.object_store_options import io_config_to_storage_options from daft.logical.schema import Schema -PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < (8, 0, 0) +native_excutor_skip = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + +PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < ( + 8, + 0, + 0, +) PYTHON_LT_3_8 = sys.version_info[:2] < (3, 8) -pytestmark = pytest.mark.skipif( - PYARROW_LE_8_0_0 or PYTHON_LT_3_8, reason="deltalake only supported if pyarrow >= 8.0.0 and python >= 3.8" +py_version_or_arrow_skip = pytest.mark.skipif( + PYARROW_LE_8_0_0 or PYTHON_LT_3_8, + reason="deltalake only supported if pyarrow >= 8.0.0 and python >= 3.8", ) +pytestmark = [native_excutor_skip, py_version_or_arrow_skip] def test_deltalake_write_basic(tmp_path, base_table): diff --git a/tests/io/iceberg/test_iceberg_writes.py b/tests/io/iceberg/test_iceberg_writes.py index 35c2eaffa2..f15247953c 100644 --- a/tests/io/iceberg/test_iceberg_writes.py +++ b/tests/io/iceberg/test_iceberg_writes.py @@ -3,10 +3,18 @@ import pyarrow as pa import pytest +from daft import context + +native_excutor_skip = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + pyiceberg = pytest.importorskip("pyiceberg") PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < (8, 0, 0) -pytestmark = pytest.mark.skipif(PYARROW_LE_8_0_0, reason="iceberg only supported if pyarrow >= 8.0.0") +py_arrow_skip = pytest.mark.skipif(PYARROW_LE_8_0_0, reason="iceberg only supported if pyarrow >= 8.0.0") +pytestmark = [native_excutor_skip, py_arrow_skip] from pyiceberg.catalog.sql import SqlCatalog diff --git a/tests/io/lancedb/test_lancedb_reads.py b/tests/io/lancedb/test_lancedb_reads.py index ad3062ee19..b1f365f8c1 100644 --- a/tests/io/lancedb/test_lancedb_reads.py +++ b/tests/io/lancedb/test_lancedb_reads.py @@ -3,6 +3,12 @@ import pytest import daft +from daft import context + +native_executor_skip = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) TABLE_NAME = "my_table" data = { @@ -12,7 +18,8 @@ } PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < (8, 0, 0) -pytestmark = pytest.mark.skipif(PYARROW_LE_8_0_0, reason="lance only supported if pyarrow >= 8.0.0") +py_arrow_skip = pytest.mark.skipif(PYARROW_LE_8_0_0, reason="lance only supported if pyarrow >= 8.0.0") +pytestmark = [native_executor_skip, py_arrow_skip] @pytest.fixture(scope="function") diff --git a/tests/io/lancedb/test_lancedb_writes.py b/tests/io/lancedb/test_lancedb_writes.py index e9f1ce6fd7..bc4036e563 100644 --- a/tests/io/lancedb/test_lancedb_writes.py +++ b/tests/io/lancedb/test_lancedb_writes.py @@ -4,6 +4,12 @@ import pytest import daft +from daft import context + +native_executor_skip = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) TABLE_NAME = "my_table" data = { @@ -14,9 +20,10 @@ PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < (8, 0, 0) PY_LE_3_9_0 = sys.version_info < (3, 9) -pytestmark = pytest.mark.skipif( +py_version_and_arrow_skip = pytest.mark.skipif( PYARROW_LE_8_0_0 or PY_LE_3_9_0, reason="lance only supported if pyarrow >= 8.0.0 and python >= 3.9.0" ) +pytestmark = [native_executor_skip, py_version_and_arrow_skip] @pytest.fixture(scope="function") diff --git a/tests/io/test_csv_roundtrip.py b/tests/io/test_csv_roundtrip.py index dd288e6806..b9e8ccc9b8 100644 --- a/tests/io/test_csv_roundtrip.py +++ b/tests/io/test_csv_roundtrip.py @@ -7,8 +7,12 @@ import pytest import daft -from daft import DataType, TimeUnit +from daft import DataType, TimeUnit, context +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) PYARROW_GE_11_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (11, 0, 0) diff --git a/tests/io/test_parquet.py b/tests/io/test_parquet.py index be2be34fca..c30ae8da1a 100644 --- a/tests/io/test_parquet.py +++ b/tests/io/test_parquet.py @@ -12,6 +12,7 @@ import pytest import daft +from daft import context from daft.daft import NativeStorageConfig, PythonStorageConfig, StorageConfig from daft.datatype import DataType, TimeUnit from daft.expressions import col @@ -20,6 +21,10 @@ from ..integration.io.conftest import minio_create_bucket +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) PYARROW_GE_11_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (11, 0, 0) PYARROW_GE_13_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (13, 0, 0) diff --git a/tests/io/test_parquet_roundtrip.py b/tests/io/test_parquet_roundtrip.py index 0dadaec51e..8448343abc 100644 --- a/tests/io/test_parquet_roundtrip.py +++ b/tests/io/test_parquet_roundtrip.py @@ -12,6 +12,13 @@ PYARROW_GE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (8, 0, 0) +from daft import context + +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + @pytest.mark.skipif( not PYARROW_GE_8_0_0, diff --git a/tests/io/test_s3_credentials_refresh.py b/tests/io/test_s3_credentials_refresh.py index 16a98fadf0..25c5c0cd8c 100644 --- a/tests/io/test_s3_credentials_refresh.py +++ b/tests/io/test_s3_credentials_refresh.py @@ -10,8 +10,14 @@ import pytest import daft +from daft import context from tests.io.mock_aws_server import start_service, stop_process +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + @pytest.fixture(scope="session") def aws_log_file(tmp_path_factory: pytest.TempPathFactory) -> Iterator[io.IOBase]: diff --git a/tests/test_resource_requests.py b/tests/test_resource_requests.py index 9255b9163b..20a2aadf21 100644 --- a/tests/test_resource_requests.py +++ b/tests/test_resource_requests.py @@ -7,12 +7,17 @@ import ray import daft -from daft import udf +from daft import context, udf from daft.context import get_context from daft.daft import SystemInfo from daft.expressions import col from daft.internal.gpu import cuda_device_count +pytestmark = pytest.mark.skipif( + context.get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + def no_gpu_available() -> bool: return cuda_device_count() == 0