Skip to content

Commit

Permalink
[CHORE] Add native executor to CI (#2855)
Browse files Browse the repository at this point in the history
Enables native executor in CI (just unit tests for now). Any tests that
fail because the native executor doesn't support it yet is marked as
skip. The skips will be incrementally removed as functionality gets
added.

---------

Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
colin-ho and Colin Ho committed Sep 19, 2024
1 parent 6766955 commit 7666669
Show file tree
Hide file tree
Showing 36 changed files with 252 additions and 24 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ jobs:
python-version: ['3.8', '3.10']
daft-runner: [py, ray]
pyarrow-version: [7.0.0, 16.0.0]
enable-native-executor: [0, 1]
os: [ubuntu-20.04, windows-latest]
exclude:
- daft-runner: ray
enable-native-executor: 1
- daft-runner: ray
pyarrow-version: 7.0.0
os: ubuntu-20.04
Expand Down Expand Up @@ -115,6 +118,7 @@ jobs:
CARGO_TARGET_DIR: ./target

DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_NATIVE_EXECUTOR: ${{ matrix.enable-native-executor }}

- name: Build library and Test with pytest (Windows)
if: ${{ (runner.os == 'Windows') }}
Expand Down
6 changes: 6 additions & 0 deletions tests/cookbook/test_aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@
import pandas as pd
import pytest

from daft import context
from daft.datatype import DataType
from daft.expressions import col
from daft.udf import udf
from tests.conftest import assert_df_equals

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


def test_sum(daft_df, service_requests_csv_pd_df, repartition_nparts):
"""Sums across an entire column for the entire table"""
Expand Down
6 changes: 6 additions & 0 deletions tests/cookbook/test_joins.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@

import pytest

from daft import context
from daft.expressions import col
from tests.conftest import assert_df_equals

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


@pytest.mark.parametrize(
"join_strategy", [None, "hash", "sort_merge", "sort_merge_aligned_boundaries", "broadcast"], indirect=True
Expand Down
5 changes: 5 additions & 0 deletions tests/cookbook/test_pandas_cookbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@
import pytest

import daft
from daft import context
from daft.datatype import DataType
from daft.expressions import col, lit
from tests.conftest import assert_df_equals

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)
###
# Idioms: if-then
###
Expand Down
5 changes: 5 additions & 0 deletions tests/cookbook/test_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@
from pyarrow import dataset as pads

import daft
from daft import context
from tests.conftest import assert_df_equals
from tests.cookbook.assets import COOKBOOK_DATA_CSV

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)
PYARROW_GE_7_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (7, 0, 0)


Expand Down
7 changes: 6 additions & 1 deletion tests/dataframe/test_aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@
import pytest

import daft
from daft import col
from daft import col, context
from daft.context import get_context
from daft.datatype import DataType
from daft.errors import ExpressionTypeError
from daft.utils import freeze
from tests.utils import sort_arrow_table

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


@pytest.mark.parametrize("repartition_nparts", [1, 2, 4])
def test_agg_global(make_df, repartition_nparts):
Expand Down
7 changes: 6 additions & 1 deletion tests/dataframe/test_approx_count_distinct.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
import pytest

import daft
from daft import col
from daft import col, context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)

TESTS = [
[[], 0],
Expand Down
7 changes: 6 additions & 1 deletion tests/dataframe/test_approx_percentiles_aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
import pyarrow as pa
import pytest

from daft import col
from daft import col, context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


@pytest.mark.parametrize("repartition_nparts", [1, 2, 4])
Expand Down
7 changes: 7 additions & 0 deletions tests/dataframe/test_concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

import pytest

from daft import context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


def test_simple_concat(make_df):
df1 = make_df({"foo": [1, 2, 3]})
Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@
import pytest

import daft
from daft import context
from daft.api_annotations import APITypeError
from daft.dataframe import DataFrame
from daft.datatype import DataType
from daft.utils import pyarrow_supports_fixed_shape_tensor
from tests.conftest import UuidType

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)

ARROW_VERSION = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric())


Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_decimals.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
import pytest

import daft
from daft import context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)

PYARROW_GE_7_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (7, 0, 0)

Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_distinct.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
import pyarrow as pa
import pytest

from daft import context
from daft.datatype import DataType
from tests.utils import sort_arrow_table

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


@pytest.mark.parametrize("repartition_nparts", [1, 2, 5])
def test_distinct_with_nulls(make_df, repartition_nparts):
Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_explode.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@
import pyarrow as pa
import pytest

from daft import context
from daft.expressions import col

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


@pytest.mark.parametrize(
"data",
Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
import pytest

import daft
from daft import context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


class MockException(Exception):
Expand Down
7 changes: 6 additions & 1 deletion tests/dataframe/test_joins.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
import pyarrow as pa
import pytest

from daft import col
from daft import col, context
from daft.datatype import DataType
from daft.errors import ExpressionTypeError
from tests.utils import sort_arrow_table

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


def skip_invalid_join_strategies(join_strategy, join_type):
if (join_strategy == "sort_merge" or join_strategy == "sort_merge_aligned_boundaries") and join_type != "inner":
Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_map_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
import pytest

import daft
from daft import context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


@pytest.mark.parametrize("repartition_nparts", [1, 2, 4])
Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_monotonically_increasing_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@

import pytest

from daft import context
from daft.datatype import DataType

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


def test_monotonically_increasing_id_single_partition(make_df) -> None:
data = {"a": [1, 2, 3, 4, 5]}
Expand Down
7 changes: 7 additions & 0 deletions tests/dataframe/test_pivot.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import pytest

from daft import context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


@pytest.mark.parametrize("repartition_nparts", [1, 2, 5])
def test_pivot(make_df, repartition_nparts):
Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_repr.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@
from PIL import Image

import daft
from daft import context
from tests.utils import ANSI_ESCAPE, TD_STYLE, TH_STYLE

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)

ROW_DIVIDER_REGEX = re.compile(r"╭─+┬*─*╮|├╌+┼*╌+┤")
SHOWING_N_ROWS_REGEX = re.compile(r".*\(Showing first (\d+) of (\d+) rows\).*")
UNMATERIALIZED_REGEX = re.compile(r".*\(No data to display: Dataframe not materialized\).*")
Expand Down
7 changes: 7 additions & 0 deletions tests/dataframe/test_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

import pytest

from daft import context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


def test_sample_fraction(make_df, valid_data: list[dict[str, float]]) -> None:
df = make_df(valid_data)
Expand Down
5 changes: 5 additions & 0 deletions tests/dataframe/test_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
import pyarrow as pa
import pytest

from daft import context
from daft.datatype import DataType
from daft.errors import ExpressionTypeError

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)
###
# Validation tests
###
Expand Down
7 changes: 6 additions & 1 deletion tests/dataframe/test_temporals.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
import pytz

import daft
from daft import DataType, col
from daft import DataType, col, context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)

PYARROW_GE_7_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (7, 0, 0)

Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
import pytest

import daft
from daft import context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


def add_1(df):
Expand Down
7 changes: 6 additions & 1 deletion tests/dataframe/test_unpivot.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import pytest

from daft import col
from daft import col, context
from daft.datatype import DataType

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


@pytest.mark.parametrize("n_partitions", [1, 2, 4])
def test_unpivot(make_df, n_partitions):
Expand Down
7 changes: 6 additions & 1 deletion tests/dataframe/test_wildcard.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import pytest

import daft
from daft import col
from daft import col, context
from daft.exceptions import DaftCoreException

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


def test_wildcard_select():
df = daft.from_pydict(
Expand Down
1 change: 0 additions & 1 deletion tests/integration/iceberg/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < (8, 0, 0)
pytestmark = pytest.mark.skipif(PYARROW_LE_8_0_0, reason="iceberg writes only supported if pyarrow >= 8.0.0")


import tenacity
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.table import Table
Expand Down
Loading

0 comments on commit 7666669

Please sign in to comment.