Skip to content

Commit

Permalink
Bump ray[data,default] from 2.4.0 to 2.5.1 (#1074)
Browse files Browse the repository at this point in the history
* Adds necessary fixes in both tests and code for new version of Ray Datasets
* Adds workflow for testing Daft against Ray 2.4 and 2.5
* Bumps our requirement.txt file to use Ray 2.5 in the local development environment

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
dependabot[bot] and Jay Chia authored Jul 6, 2023
1 parent a4dbd62 commit 1e21665
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ray-compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
fail-fast: false
matrix:
python-version: ['3.9']
ray-version: [2.3.0, 2.2.0, 2.1.0, 2.0.0]
ray-version: [2.5.1, 2.4.0, 2.3.0, 2.2.0, 2.1.0, 2.0.0]

steps:
- uses: actions/checkout@v3
Expand Down
4 changes: 4 additions & 0 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ def partition_set_from_ray_dataset(
)
arrow_schema = ds.schema(fetch_if_missing=True)

# Ray 2.5.0 broke the API by using its own `ray.data.dataset.Schema` instead of PyArrow schemas
if RAY_VERSION >= (2, 5, 0):
arrow_schema = pa.schema({name: t for name, t in zip(arrow_schema.names, arrow_schema.types)})

daft_schema = Schema._from_field_name_and_types(
[(arrow_field.name, DataType.from_arrow_type(arrow_field.type)) for arrow_field in arrow_schema]
)
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Pillow==9.5.0
opencv-python==4.7.0.72

# Ray
ray[data, default]==2.4.0
ray[data, default]==2.5.1
pydantic<2 # pin pydantic because Ray uses broken APIs

# AWS
Expand Down
27 changes: 21 additions & 6 deletions tests/ray/test_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ def __eq__(self, other: Any) -> bool:
}


def _row_to_pydict(row: ray.data.row.TableRow | dict) -> dict:
if isinstance(row, dict):
return row
return row.as_pydict()


@pytest.mark.skipif(get_context().runner_config.name != "ray", reason="Needs to run on Ray runner")
@pytest.mark.parametrize("n_partitions", [1, 2])
def test_to_ray_dataset_all_arrow(n_partitions: int):
Expand All @@ -43,7 +49,7 @@ def test_to_ray_dataset_all_arrow(n_partitions: int):
elif RAY_VERSION >= (2, 0, 0):
assert ds._dataset_format() == "arrow", "Ray Dataset format should be arrow"

rows = sorted([row.as_pydict() for row in ds.iter_rows()], key=lambda r: r["intcol"])
rows = sorted([_row_to_pydict(row) for row in ds.iter_rows()], key=lambda r: r["intcol"])
assert rows == sorted(
[
{"intcol": 1, "strcol": "a", "floatcol": 1.0},
Expand All @@ -55,6 +61,9 @@ def test_to_ray_dataset_all_arrow(n_partitions: int):


@pytest.mark.skipif(get_context().runner_config.name != "ray", reason="Needs to run on Ray runner")
@pytest.mark.skipif(
RAY_VERSION >= (2, 5, 0), reason="Ray Datasets versions >= 2.5.0 no longer support Python objects as rows"
)
@pytest.mark.parametrize("n_partitions", [1, 2])
def test_to_ray_dataset_with_py(n_partitions: int):
df = daft.from_pydict(DATA).repartition(n_partitions)
Expand Down Expand Up @@ -95,7 +104,7 @@ def test_to_ray_dataset_with_numpy(n_partitions: int):
ds._dataset_format() == "arrow"
), "Ray Dataset format should be arrow because it uses a Tensor extension type"

rows = sorted([row.as_pydict() for row in ds.iter_rows()], key=lambda r: r["intcol"])
rows = sorted([_row_to_pydict(row) for row in ds.iter_rows()], key=lambda r: r["intcol"])
np.testing.assert_equal(
rows,
sorted(
Expand Down Expand Up @@ -127,7 +136,7 @@ def test_to_ray_dataset_with_numpy_variable_shaped(n_partitions: int):
ds._dataset_format() == "simple"
), "In old versions of Ray, we drop down to `simple` format because ArrowTensorType is not compatible with ragged tensors"

rows = sorted([row.as_pydict() for row in ds.iter_rows()], key=lambda r: r["intcol"])
rows = sorted([_row_to_pydict(row) for row in ds.iter_rows()], key=lambda r: r["intcol"])
np.testing.assert_equal(
rows,
sorted(
Expand Down Expand Up @@ -180,15 +189,21 @@ def test_from_ray_dataset_simple(n_partitions: int):
df = daft.from_ray_dataset(ds)
# Sort data since partition ordering in Datasets is not deterministic.
out = df.to_pydict()
assert list(out.keys()) == ["value"]
assert sorted(out["value"]) == list(range(8))
key = "id" if RAY_VERSION >= (2, 5, 0) else "value"
assert list(out.keys()) == [key]
assert sorted(out[key]) == list(range(8))


@pytest.mark.skipif(get_context().runner_config.name != "ray", reason="Needs to run on Ray runner")
@pytest.mark.parametrize("n_partitions", [1, 2])
def test_from_ray_dataset_tensor(n_partitions: int):
ds = ray.data.range(8)
ds = ds.map(lambda i: {"int": i, "np": np.ones((3, 3))}).repartition(n_partitions)
ds = (
ds.map(lambda d: {"int": d["id"], "np": np.ones((3, 3))})
if RAY_VERSION >= (2, 5, 0)
else ds.map(lambda i: {"int": i, "np": np.ones((3, 3))})
)
ds = ds.repartition(n_partitions)

df = daft.from_ray_dataset(ds)
out = df.to_pydict()
Expand Down

0 comments on commit 1e21665

Please sign in to comment.