From d7d1ae0f21e4a13b6b4a08d9ba8c14fe18d5f17e Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Sun, 24 Sep 2023 21:20:55 -0700 Subject: [PATCH] [FEAT] Add support for windows in daft (#1386) * Implements fixes for daft to be able to build for windows * implements protocol checking in filesystem.py for windows style paths * implements protocol checking in native local objectio for windows style paths. * fix unit tests to use os libraries to construct path * rewrite file io unit tests to not use a open file descriptor which breaks on windows * disable AVX for x86 builds for mac and windows * implement unit test, rust tests and import tests CI for windows * implements windows wheel building for publish pipeline * disables query plan toggle for CI * fix REPR style unit tests for `\r` --- .github/workflows/python-package.yml | 91 ++++++--- .github/workflows/python-publish.yml | 31 +-- daft/filesystem.py | 3 + requirements-dev.txt | 4 +- src/daft-io/src/lib.rs | 4 + src/lib.rs | 1 + tests/cookbook/test_dataloading.py | 36 ++-- tests/dataframe/test_creation.py | 271 +++++++++++++++------------ tests/dataframe/test_repr.py | 2 +- tests/integration/test_tpch.py | 4 + tests/series/test_concat.py | 6 +- tests/series/test_tensor.py | 2 +- tests/table/table_io/test_parquet.py | 8 +- tests/test_schema.py | 2 +- tests/udf_library/test_url_udfs.py | 7 +- 15 files changed, 285 insertions(+), 187 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 979ae9a1a5..74c40cb06f 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -14,21 +14,25 @@ env: jobs: unit-tests-with-coverage: - runs-on: ubuntu-latest - timeout-minutes: 15 + runs-on: ${{ matrix.os }}-latest + timeout-minutes: 30 strategy: fail-fast: false matrix: python-version: ['3.7', '3.10'] daft-runner: [py, ray] - new-query-planner: [1, 0] pyarrow-version: [6.0.1, 12.0] + os: [ubuntu, windows] exclude: - daft-runner: ray pyarrow-version: 6.0.1 + os: ubuntu - daft-runner: py python-version: '3.10' pyarrow-version: 6.0.1 + os: ubuntu + - os: windows + pyarrow-version: 12.0 steps: - uses: actions/checkout@v4 @@ -53,15 +57,24 @@ jobs: echo "$GITHUB_WORKSPACE/venv/bin" >> $GITHUB_PATH - name: Install dependencies + if: ${{ (matrix.os != 'windows') }} run: | pip install --upgrade pip pip install -r requirements-dev.txt + - name: Install dependencies (Windows) + if: ${{ (matrix.os == 'windows') }} + run: | + .\venv\Scripts\activate + python -m pip install --upgrade pip + python -m pip install -r requirements-dev.txt + - name: Override pyarrow - if: ${{ matrix.pyarrow-version }} + if: ${{ (matrix.pyarrow-version) && (matrix.os != 'windows') }} run: pip install pyarrow==${{ matrix.pyarrow-version }} - - name: Build library and Test with pytest + - name: Build library and Test with pytest (unix) + if: ${{ (matrix.os != 'windows') }} run: | source activate # source <(cargo llvm-cov show-env --export-prefix) @@ -71,11 +84,25 @@ jobs: maturin develop mkdir -p report-output && pytest --cov=daft --ignore tests/integration --durations=50 coverage combine -a --data-file='.coverage' || true - coverage xml -o ./report-output/coverage-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.daft-runner }}-${{ matrix.pyarrow-version }}-${{ matrix.new-query-planner }}.xml - # cargo llvm-cov --no-run --lcov --output-path report-output/rust-coverage-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.daft-runner }}-${{ matrix.pyarrow-version }}-${{ matrix.new-query-planner }}.lcov + coverage xml -o ./report-output/coverage-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.daft-runner }}-${{ matrix.pyarrow-version }}.xml + # cargo llvm-cov --no-run --lcov --output-path report-output/rust-coverage-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.daft-runner }}-${{ matrix.pyarrow-version }}.lcov + env: + DAFT_RUNNER: ${{ matrix.daft-runner }} + - name: Build library and Test with pytest (windows) + if: ${{ (matrix.os == 'windows') }} + run: | + .\venv\Scripts\activate + # source <(cargo llvm-cov show-env --export-prefix) + # export CARGO_TARGET_DIR=$CARGO_LLVM_COV_TARGET_DIR + # export CARGO_INCREMENTAL=1 + # cargo llvm-cov clean --workspace + maturin develop + mkdir -p report-output && pytest --cov=daft --ignore tests\integration --durations=50 + coverage combine -a --data-file='.coverage' || true + coverage xml -o .\report-output\coverage-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.daft-runner }}-${{ matrix.pyarrow-version }}.xml + # cargo llvm-cov --no-run --lcov --output-path report-output\rust-coverage-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.daft-runner }}-${{ matrix.pyarrow-version }}.lcov env: DAFT_RUNNER: ${{ matrix.daft-runner }} - DAFT_NEW_QUERY_PLANNER: ${{ matrix.new-query-planner }} - name: Upload coverage report uses: actions/upload-artifact@v3 @@ -151,7 +178,6 @@ jobs: matrix: python-version: ['3.7'] daft-runner: [py, ray] - new-query-planner: [1, 0] steps: - uses: actions/checkout@v4 with: @@ -186,7 +212,6 @@ jobs: pytest tests/integration/test_tpch.py --durations=50 env: DAFT_RUNNER: ${{ matrix.daft-runner }} - DAFT_NEW_QUERY_PLANNER: ${{ matrix.new-query-planner }} - name: Send Slack notification on failure uses: slackapi/slack-github-action@v1.24.0 if: ${{ failure() && (github.ref == 'refs/heads/main') }} @@ -219,7 +244,6 @@ jobs: matrix: python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs daft-runner: [py, ray] - new-query-planner: [1, 0] # These permissions are needed to interact with GitHub's OIDC Token endpoint. # This is used in the step "Assume GitHub Actions AWS Credentials" permissions: @@ -276,7 +300,6 @@ jobs: pytest tests/integration/io -m 'integration' --durations=50 env: DAFT_RUNNER: ${{ matrix.daft-runner }} - DAFT_NEW_QUERY_PLANNER: ${{ matrix.new-query-planner }} - name: Send Slack notification on failure uses: slackapi/slack-github-action@v1.24.0 if: ${{ failure() && (github.ref == 'refs/heads/main') }} @@ -298,10 +321,12 @@ jobs: SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK rust-tests: - runs-on: ubuntu-latest - timeout-minutes: 15 + runs-on: ${{ matrix.os }}-latest + timeout-minutes: 30 strategy: fail-fast: false + matrix: + os: [ubuntu, windows] steps: - uses: actions/checkout@v4 - uses: moonrepo/setup-rust@v0 @@ -379,13 +404,13 @@ jobs: SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK test-imports: - runs-on: ubuntu-latest - timeout-minutes: 15 + runs-on: ${{ matrix.os }}-latest + timeout-minutes: 30 strategy: fail-fast: false matrix: + os: [ubuntu, windows] python-version: ['3.7'] - steps: - uses: actions/checkout@v4 - uses: moonrepo/setup-rust@v0 @@ -401,25 +426,43 @@ jobs: with: python-version: ${{ matrix.python-version }} - - name: Setup Virtual Env + - name: Unix Build + if: ${{ (matrix.os != 'windows') }} run: | python -m venv venv source venv/bin/activate - pip install maturin + python -m pip install maturin + maturin build --out dist - - name: Build Rust Library + - name: Windows Build + if: ${{ (matrix.os == 'windows') }} run: | - venv/bin/maturin build --out dist + python -m venv venv + .\venv\Scripts\activate + python -m pip install maturin + maturin build --out dist - - name: Test Imports in Clean Env + - name: Test Imports in Clean Env (Unix) + if: ${{ (matrix.os != 'windows') }} run: | rm -rf daft rm -rf venv python -m venv venv source venv/bin/activate ls -R ./dist - venv/bin/pip install dist/*.whl - venv/bin/python -c 'import daft; from daft import *' + pip install dist/*.whl + python -c 'import daft; from daft import *' + + - name: Test Imports in Clean Env (Windows) + if: ${{ (matrix.os == 'windows') }} + run: | + rd -r daft + rd -r venv + python -m venv venv + .\venv\Scripts\activate + $FILES = Get-ChildItem -Path .\dist\*.whl -Force -Recurse + python -m pip install $FILES[0].FullName + python -c 'import daft; from daft import *' - name: Send Slack notification on failure uses: slackapi/slack-github-action@v1.24.0 diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml index 258caab05c..050844c9e2 100644 --- a/.github/workflows/python-publish.yml +++ b/.github/workflows/python-publish.yml @@ -14,10 +14,9 @@ on: tags: - v* workflow_dispatch: - env: PACKAGE_NAME: getdaft - PYTHON_VERSION: 3.7.16 # to build abi3 wheels + PYTHON_VERSION: 3.8 DAFT_ANALYTICS_ENABLED: '0' IS_PUSH: ${{ github.event_name == 'push' && startsWith(github.ref, 'refs/tags/v') && ( ! endsWith(github.ref, 'dev0')) }} @@ -32,8 +31,11 @@ jobs: strategy: fail-fast: false matrix: - runs: [ubuntu-latest, macos-latest-xl] + runs: [ubuntu-latest, macos-latest-xl, windows-latest-l] compile_arch: [x86_64, aarch64] + exclude: + - runs: windows-latest-l + compile_arch: aarch64 steps: - uses: actions/checkout@v4 with: @@ -45,16 +47,14 @@ jobs: architecture: x64 - run: pip install -U twine toml - run: python tools/patch_package_version.py - - uses: moonrepo/setup-rust@v0 - - name: Build wheels - Mac x86 - if: ${{ (matrix.runs == 'macos-latest-xl') && (matrix.compile_arch == 'x86_64') }} + - name: Build wheels - Mac and Windows x86 + if: ${{ ((matrix.runs == 'macos-latest-xl') || (matrix.runs == 'windows-latest-l')) && (matrix.compile_arch == 'x86_64') }} uses: messense/maturin-action@v1 with: target: x86_64 - manylinux: auto args: --profile release-lto --out dist --sdist env: - RUSTFLAGS: -C target-feature=+fxsr,+sse,+sse2,+sse3,+ssse3,+sse4.1,+sse4.2,+popcnt,+avx,+fma + RUSTFLAGS: -C target-feature=+fxsr,+sse,+sse2,+sse3,+ssse3,+sse4.1,+sse4.2 - name: Build wheels - Linux x86 if: ${{ (matrix.runs == 'ubuntu-latest') && (matrix.compile_arch == 'x86_64') }} uses: messense/maturin-action@v1 @@ -86,13 +86,21 @@ jobs: env: RUSTFLAGS: -Ctarget-cpu=apple-m1 - - name: Install and test built wheel - x86_64 - if: ${{ matrix.compile_arch == 'x86_64' }} + - name: Install and test built wheel - Linux and Mac x86_64 + if: ${{ ((matrix.runs == 'macos-latest-xl') || (matrix.runs == 'ubuntu-latest')) && (matrix.compile_arch == 'x86_64') }} run: | pip install -r requirements-dev.txt dist/${{ env.PACKAGE_NAME }}-*x86_64*.whl --force-reinstall rm -rf daft pytest -v + - name: Install and test built wheel - Windows x86_64 + if: ${{ (matrix.runs == 'windows-latest-l') && (matrix.compile_arch == 'x86_64') }} + run: | + $FILES = Get-ChildItem -Path .\dist\${{ env.PACKAGE_NAME }}-*-win_amd64.whl -Force -Recurse + pip install -r requirements-dev.txt $FILES[0].FullName --force-reinstall + rd -r daft + pytest -v + - name: Upload wheels uses: actions/upload-artifact@v3 with: @@ -101,7 +109,7 @@ jobs: - name: Send Slack notification on failure uses: slackapi/slack-github-action@v1.24.0 - if: failure() + if: ${{ failure() && (github.ref == 'refs/heads/main') }} with: payload: | { @@ -122,6 +130,7 @@ jobs: publish: name: Publish wheels to PYPI and Anaconda + if: ${{ (github.ref == 'refs/heads/main') }} runs-on: ubuntu-latest needs: - build-and-test diff --git a/daft/filesystem.py b/daft/filesystem.py index eb56e02a0a..b675c45ee4 100644 --- a/daft/filesystem.py +++ b/daft/filesystem.py @@ -101,8 +101,11 @@ def get_filesystem(protocol: str, **kwargs) -> fsspec.AbstractFileSystem: def get_protocol_from_path(path: str) -> str: parsed_scheme = urllib.parse.urlparse(path, allow_fragments=False).scheme + parsed_scheme = parsed_scheme.lower() if parsed_scheme == "" or parsed_scheme is None: return "file" + if sys.platform == "win32" and len(parsed_scheme) == 1 and ("a" <= parsed_scheme) and (parsed_scheme <= "z"): + return "file" return parsed_scheme diff --git a/requirements-dev.txt b/requirements-dev.txt index 5d03f619bd..dce5205005 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -29,8 +29,8 @@ Pillow==9.5.0 opencv-python==4.8.0.76 # Pyarrow -pyarrow==12 - +pyarrow==12; platform_system != "Windows" +pyarrow==6.0.1; platform_system === "Windows" # Ray ray[data, default]==2.6.3 pydantic<2 # pin pydantic because Ray uses broken APIs diff --git a/src/daft-io/src/lib.rs b/src/daft-io/src/lib.rs index e83a9768a9..8c226b5255 100644 --- a/src/daft-io/src/lib.rs +++ b/src/daft-io/src/lib.rs @@ -250,6 +250,10 @@ fn parse_url(input: &str) -> Result<(SourceType, Cow<'_, str>)> { "s3" => Ok((SourceType::S3, fixed_input)), "az" | "abfs" => Ok((SourceType::AzureBlob, fixed_input)), "gcs" | "gs" => Ok((SourceType::GCS, fixed_input)), + #[cfg(target_env = "msvc")] + _ if scheme.len() == 1 && ("a" <= scheme.as_str() && (scheme.as_str() <= "z")) => { + Ok((SourceType::File, Cow::Owned(format!("file://{input}")))) + } _ => Err(Error::NotImplementedSource { store: scheme }), } } diff --git a/src/lib.rs b/src/lib.rs index 2339334461..bafa6c293e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ use tikv_jemallocator::Jemalloc; #[global_allocator] static GLOBAL: Jemalloc = Jemalloc; +#[cfg(not(target_env = "msvc"))] union U { x: &'static u8, y: &'static libc::c_char, diff --git a/tests/cookbook/test_dataloading.py b/tests/cookbook/test_dataloading.py index 7238ba9ad2..655c3acbd6 100644 --- a/tests/cookbook/test_dataloading.py +++ b/tests/cookbook/test_dataloading.py @@ -1,6 +1,8 @@ from __future__ import annotations +import os import pathlib +import sys from unittest.mock import patch import pandas as pd @@ -81,10 +83,11 @@ def test_glob_files(tmpdir): bar_filepath = pathlib.Path(tmpdir) / f"file_{i}.bar" bar_filepath.write_text("b" * i) - daft_df = daft.from_glob_path(f"{tmpdir}/*.foo") + daft_df = daft.from_glob_path(os.path.join(tmpdir, "*.foo")) daft_pd_df = daft_df.to_pandas() + pd_df = pd.DataFrame.from_records( - {"path": str(path), "size": size, "num_rows": None} for path, size in zip(filepaths, list(range(10))) + {"path": str(path.as_posix()), "size": size, "num_rows": None} for path, size in zip(filepaths, list(range(10))) ) pd_df = pd_df[~pd_df["path"].str.endswith(".bar")] pd_df = pd_df.astype({"num_rows": float}) @@ -94,7 +97,7 @@ def test_glob_files(tmpdir): def test_glob_files_single_file(tmpdir): filepath = pathlib.Path(tmpdir) / f"file.foo" filepath.write_text("b" * 10) - daft_df = daft.from_glob_path(f"{tmpdir}/file.foo") + daft_df = daft.from_glob_path(os.path.join(tmpdir, "file.foo")) daft_pd_df = daft_df.to_pandas() pd_df = pd.DataFrame.from_records([{"path": str(filepath), "size": 10, "num_rows": None}]) pd_df = pd_df.astype({"num_rows": float}) @@ -115,15 +118,17 @@ def test_glob_files_directory(tmpdir): daft_pd_df = daft_df.to_pandas() listing_records = [ - {"path": str(path), "size": size, "num_rows": None} + {"path": str(path.as_posix()), "size": size, "num_rows": None} for path, size in zip(filepaths, [i for i in range(10) for _ in range(2)]) ] - listing_records = listing_records + [ - {"path": str(extra_empty_dir), "size": extra_empty_dir.stat().st_size, "num_rows": None} - ] + + dir_size = extra_empty_dir.stat().st_size + if sys.platform == "win32": + dir_size = 0 + + listing_records = listing_records + [{"path": str(extra_empty_dir.as_posix()), "size": dir_size, "num_rows": None}] pd_df = pd.DataFrame.from_records(listing_records) pd_df = pd_df.astype({"num_rows": float}) - assert_df_equals(daft_pd_df, pd_df, sort_key="path") @@ -137,16 +142,17 @@ def test_glob_files_recursive(tmpdir): filepath.write_text("a" * i) paths.append(filepath) - daft_df = daft.from_glob_path(f"{tmpdir}/**") + daft_df = daft.from_glob_path(os.path.join(tmpdir, "**")) daft_pd_df = daft_df.to_pandas() - listing_records = [ - {"path": str(path), "size": size, "num_rows": None} + {"path": str(path.as_posix()), "size": size, "num_rows": None} for path, size in zip(paths, [i for i in range(10) for _ in range(2)]) ] - listing_records = listing_records + [ - {"path": str(nested_dir_path), "size": nested_dir_path.stat().st_size, "num_rows": None} - ] + dir_size = nested_dir_path.stat().st_size + if sys.platform == "win32": + dir_size = 0 + + listing_records = listing_records + [{"path": str(nested_dir_path.as_posix()), "size": dir_size, "num_rows": None}] pd_df = pd.DataFrame.from_records(listing_records) pd_df = pd_df.astype({"num_rows": float}) @@ -175,7 +181,7 @@ def test_glob_files_custom_fs(tmpdir): daft_pd_df = daft_df.to_pandas() pd_df = pd.DataFrame.from_records( - {"path": str(path), "size": size, "num_rows": None} for path, size in zip(filepaths, list(range(10))) + {"path": str(path.as_posix()), "size": size, "num_rows": None} for path, size in zip(filepaths, list(range(10))) ) pd_df = pd_df[~pd_df["path"].str.endswith(".bar")] pd_df = pd_df.astype({"num_rows": float}) diff --git a/tests/dataframe/test_creation.py b/tests/dataframe/test_creation.py index ab2207bb2f..d541dbb64f 100644 --- a/tests/dataframe/test_creation.py +++ b/tests/dataframe/test_creation.py @@ -1,8 +1,10 @@ from __future__ import annotations +import contextlib import csv import decimal import json +import os import tempfile import uuid from unittest.mock import MagicMock, patch @@ -321,7 +323,7 @@ def test_create_dataframe_pandas_tensor(valid_data: list[dict[str, float]]) -> N id="arrow_struct", ), pytest.param( - [np.array([1]), np.array([2]), np.array([3])], + [np.array([1], dtype=np.int64), np.array([2], dtype=np.int64), np.array([3], dtype=np.int64)], DataType.list(DataType.int64()), id="numpy_1d_arrays", ), @@ -348,17 +350,22 @@ def test_load_pydict_types(data, expected_dtype): ### # CSV tests ### +@contextlib.contextmanager +def create_temp_filename() -> str: + with tempfile.TemporaryDirectory() as dir: + yield os.path.join(dir, "tempfile") def test_create_dataframe_csv(valid_data: list[dict[str, float]]) -> None: - with tempfile.NamedTemporaryFile("w") as f: - header = list(valid_data[0].keys()) - writer = csv.writer(f) - writer.writerow(header) - writer.writerows([[item[col] for col in header] for item in valid_data]) - f.flush() - - df = daft.read_csv(f.name) + with create_temp_filename() as fname: + with open(fname, "w") as f: + header = list(valid_data[0].keys()) + writer = csv.writer(f) + writer.writerow(header) + writer.writerows([[item[col] for col in header] for item in valid_data]) + f.flush() + + df = daft.read_csv(fname) assert df.column_names == COL_NAMES pd_df = df.to_pandas() @@ -367,15 +374,16 @@ def test_create_dataframe_csv(valid_data: list[dict[str, float]]) -> None: def test_create_dataframe_multiple_csvs(valid_data: list[dict[str, float]]) -> None: - with tempfile.NamedTemporaryFile("w") as f1, tempfile.NamedTemporaryFile("w") as f2: - for f in (f1, f2): - header = list(valid_data[0].keys()) - writer = csv.writer(f) - writer.writerow(header) - writer.writerows([[item[col] for col in header] for item in valid_data]) - f.flush() - - df = daft.read_csv([f1.name, f2.name]) + with create_temp_filename() as f1name, create_temp_filename() as f2name: + with open(f1name, "w") as f1, open(f2name, "w") as f2: + for f in (f1, f2): + header = list(valid_data[0].keys()) + writer = csv.writer(f) + writer.writerow(header) + writer.writerows([[item[col] for col in header] for item in valid_data]) + f.flush() + + df = daft.read_csv([f1name, f2name]) assert df.column_names == COL_NAMES pd_df = df.to_pandas() @@ -388,12 +396,13 @@ def test_create_dataframe_multiple_csvs(valid_data: list[dict[str, float]]) -> N reason="requires PyRunner to be in use", ) def test_create_dataframe_csv_custom_fs(valid_data: list[dict[str, float]]) -> None: - with tempfile.NamedTemporaryFile("w") as f: - header = list(valid_data[0].keys()) - writer = csv.writer(f) - writer.writerow(header) - writer.writerows([[item[col] for col in header] for item in valid_data]) - f.flush() + with create_temp_filename() as fname: + with open(fname, "w") as f: + header = list(valid_data[0].keys()) + writer = csv.writer(f) + writer.writerow(header) + writer.writerows([[item[col] for col in header] for item in valid_data]) + f.flush() # Mark that this filesystem instance shouldn't be automatically reused by fsspec; without this, # fsspec would cache this instance and reuse it for Daft's default construction of filesystems, @@ -403,7 +412,7 @@ def test_create_dataframe_csv_custom_fs(valid_data: list[dict[str, float]]) -> N with patch.object(fs, "info", wraps=fs.info) as mock_info, patch.object( fs, "open", wraps=fs.open ) as mock_open, patch("daft.filesystem._get_fs_from_cache", mock_cache): - df = daft.read_csv(f.name, fs=fs) + df = daft.read_csv(fname, fs=fs) # Check that info() is called on the passed filesystem. mock_info.assert_called() @@ -418,14 +427,15 @@ def test_create_dataframe_csv_custom_fs(valid_data: list[dict[str, float]]) -> N def test_create_dataframe_csv_generate_headers(valid_data: list[dict[str, float]]) -> None: - with tempfile.NamedTemporaryFile("w") as f: - header = list(valid_data[0].keys()) - writer = csv.writer(f) - writer.writerows([[item[col] for col in header] for item in valid_data]) - f.flush() + with create_temp_filename() as fname: + with open(fname, "w") as f: + header = list(valid_data[0].keys()) + writer = csv.writer(f) + writer.writerows([[item[col] for col in header] for item in valid_data]) + f.flush() cnames = [f"f{i}" for i in range(5)] - df = daft.read_csv(f.name, has_headers=False) + df = daft.read_csv(fname, has_headers=False) assert df.column_names == cnames pd_df = df.to_pandas() @@ -434,16 +444,17 @@ def test_create_dataframe_csv_generate_headers(valid_data: list[dict[str, float] def test_create_dataframe_csv_column_projection(valid_data: list[dict[str, float]]) -> None: - with tempfile.NamedTemporaryFile("w") as f: - header = list(valid_data[0].keys()) - writer = csv.writer(f) - writer.writerow(header) - writer.writerows([[item[col] for col in header] for item in valid_data]) - f.flush() + with create_temp_filename() as fname: + with open(fname, "w") as f: + header = list(valid_data[0].keys()) + writer = csv.writer(f) + writer.writerow(header) + writer.writerows([[item[col] for col in header] for item in valid_data]) + f.flush() col_subset = COL_NAMES[:3] - df = daft.read_csv(f.name) + df = daft.read_csv(fname) df = df.select(*col_subset) assert df.column_names == col_subset @@ -453,14 +464,15 @@ def test_create_dataframe_csv_column_projection(valid_data: list[dict[str, float def test_create_dataframe_csv_custom_delimiter(valid_data: list[dict[str, float]]) -> None: - with tempfile.NamedTemporaryFile("w") as f: - header = list(valid_data[0].keys()) - writer = csv.writer(f, delimiter="\t") - writer.writerow(header) - writer.writerows([[item[col] for col in header] for item in valid_data]) - f.flush() - - df = daft.read_csv(f.name, delimiter="\t") + with create_temp_filename() as fname: + with open(fname, "w") as f: + header = list(valid_data[0].keys()) + writer = csv.writer(f, delimiter="\t") + writer.writerow(header) + writer.writerows([[item[col] for col in header] for item in valid_data]) + f.flush() + + df = daft.read_csv(fname, delimiter="\t") assert df.column_names == COL_NAMES pd_df = df.to_pandas() @@ -469,15 +481,16 @@ def test_create_dataframe_csv_custom_delimiter(valid_data: list[dict[str, float] def test_create_dataframe_csv_specify_schema(valid_data: list[dict[str, float]]) -> None: - with tempfile.NamedTemporaryFile("w") as f: - header = list(valid_data[0].keys()) - writer = csv.writer(f, delimiter="\t") - writer.writerow(header) - writer.writerows([[item[col] for col in header] for item in valid_data]) - f.flush() + with create_temp_filename() as fname: + with open(fname, "w") as f: + header = list(valid_data[0].keys()) + writer = csv.writer(f, delimiter="\t") + writer.writerow(header) + writer.writerows([[item[col] for col in header] for item in valid_data]) + f.flush() df = daft.read_csv( - f.name, + fname, delimiter="\t", schema_hints={ "sepal_length": DataType.float32(), @@ -495,14 +508,15 @@ def test_create_dataframe_csv_specify_schema(valid_data: list[dict[str, float]]) def test_create_dataframe_csv_specify_schema_no_headers(valid_data: list[dict[str, float]]) -> None: - with tempfile.NamedTemporaryFile("w") as f: - header = list(valid_data[0].keys()) - writer = csv.writer(f, delimiter="\t") - writer.writerows([[item[col] for col in header] for item in valid_data]) - f.flush() + with create_temp_filename() as fname: + with open(fname, "w") as f: + header = list(valid_data[0].keys()) + writer = csv.writer(f, delimiter="\t") + writer.writerows([[item[col] for col in header] for item in valid_data]) + f.flush() df = daft.read_csv( - f.name, + fname, delimiter="\t", schema_hints={ "sepal_length": DataType.float64(), @@ -526,13 +540,14 @@ def test_create_dataframe_csv_specify_schema_no_headers(valid_data: list[dict[st def test_create_dataframe_json(valid_data: list[dict[str, float]]) -> None: - with tempfile.NamedTemporaryFile("w") as f: - for data in valid_data: - f.write(json.dumps(data)) - f.write("\n") - f.flush() + with create_temp_filename() as fname: + with open(fname, "w") as f: + for data in valid_data: + f.write(json.dumps(data)) + f.write("\n") + f.flush() - df = daft.read_json(f.name) + df = daft.read_json(fname) assert df.column_names == COL_NAMES pd_df = df.to_pandas() @@ -541,14 +556,15 @@ def test_create_dataframe_json(valid_data: list[dict[str, float]]) -> None: def test_create_dataframe_multiple_jsons(valid_data: list[dict[str, float]]) -> None: - with tempfile.NamedTemporaryFile("w") as f1, tempfile.NamedTemporaryFile("w") as f2: - for f in (f1, f2): - for data in valid_data: - f.write(json.dumps(data)) - f.write("\n") - f.flush() - - df = daft.read_json([f1.name, f2.name]) + with create_temp_filename() as f1name, create_temp_filename() as f2name: + with open(f1name, "w") as f1, open(f2name, "w") as f2: + for f in (f1, f2): + for data in valid_data: + f.write(json.dumps(data)) + f.write("\n") + f.flush() + + df = daft.read_json([f1name, f2name]) assert df.column_names == COL_NAMES pd_df = df.to_pandas() @@ -561,11 +577,12 @@ def test_create_dataframe_multiple_jsons(valid_data: list[dict[str, float]]) -> reason="requires PyRunner to be in use", ) def test_create_dataframe_json_custom_fs(valid_data: list[dict[str, float]]) -> None: - with tempfile.NamedTemporaryFile("w") as f: - for data in valid_data: - f.write(json.dumps(data)) - f.write("\n") - f.flush() + with create_temp_filename() as fname: + with open(fname, "w") as f: + for data in valid_data: + f.write(json.dumps(data)) + f.write("\n") + f.flush() # Mark that this filesystem instance shouldn't be automatically reused by fsspec; without this, # fsspec would cache this instance and reuse it for Daft's default construction of filesystems, @@ -575,7 +592,7 @@ def test_create_dataframe_json_custom_fs(valid_data: list[dict[str, float]]) -> with patch.object(fs, "info", wraps=fs.info) as mock_info, patch.object( fs, "open", wraps=fs.open ) as mock_open, patch("daft.filesystem._get_fs_from_cache", mock_cache): - df = daft.read_json(f.name, fs=fs) + df = daft.read_json(fname, fs=fs) # Check that info() is called on the passed filesystem. mock_info.assert_called() @@ -591,15 +608,16 @@ def test_create_dataframe_json_custom_fs(valid_data: list[dict[str, float]]) -> def test_create_dataframe_json_column_projection(valid_data: list[dict[str, float]]) -> None: - with tempfile.NamedTemporaryFile("w") as f: - for data in valid_data: - f.write(json.dumps(data)) - f.write("\n") - f.flush() + with create_temp_filename() as fname: + with open(fname, "w") as f: + for data in valid_data: + f.write(json.dumps(data)) + f.write("\n") + f.flush() col_subset = COL_NAMES[:3] - df = daft.read_json(f.name) + df = daft.read_json(fname) df = df.select(*col_subset) assert df.column_names == col_subset @@ -616,14 +634,15 @@ def test_create_dataframe_json_https() -> None: def test_create_dataframe_json_specify_schema(valid_data: list[dict[str, float]]) -> None: - with tempfile.NamedTemporaryFile("w") as f: - for data in valid_data: - f.write(json.dumps(data)) - f.write("\n") - f.flush() + with create_temp_filename() as fname: + with open(fname, "w") as f: + for data in valid_data: + f.write(json.dumps(data)) + f.write("\n") + f.flush() df = daft.read_json( - f.name, + fname, schema_hints={ "sepal_length": DataType.float32(), "sepal_width": DataType.float32(), @@ -646,12 +665,13 @@ def test_create_dataframe_json_specify_schema(valid_data: list[dict[str, float]] @pytest.mark.parametrize("use_native_downloader", [True, False]) def test_create_dataframe_parquet(valid_data: list[dict[str, float]], use_native_downloader) -> None: - with tempfile.NamedTemporaryFile("w") as f: - table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) - papq.write_table(table, f.name) - f.flush() + with create_temp_filename() as fname: + with open(fname, "w") as f: + table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) + papq.write_table(table, f.name) + f.flush() - df = daft.read_parquet(f.name, use_native_downloader=use_native_downloader) + df = daft.read_parquet(fname, use_native_downloader=use_native_downloader) assert df.column_names == COL_NAMES pd_df = df.to_pandas() @@ -661,12 +681,13 @@ def test_create_dataframe_parquet(valid_data: list[dict[str, float]], use_native @pytest.mark.parametrize("use_native_downloader", [True, False]) def test_create_dataframe_parquet_with_filter(valid_data: list[dict[str, float]], use_native_downloader) -> None: - with tempfile.NamedTemporaryFile("w") as f: - table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) - papq.write_table(table, f.name) - f.flush() + with create_temp_filename() as fname: + with open(fname, "w") as f: + table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) + papq.write_table(table, f.name) + f.flush() - df = daft.read_parquet(f.name, use_native_downloader=use_native_downloader) + df = daft.read_parquet(fname, use_native_downloader=use_native_downloader) assert df.column_names == COL_NAMES df = df.where(daft.col("sepal_length") > 4.8) @@ -678,13 +699,14 @@ def test_create_dataframe_parquet_with_filter(valid_data: list[dict[str, float]] @pytest.mark.parametrize("use_native_downloader", [True, False]) def test_create_dataframe_multiple_parquets(valid_data: list[dict[str, float]], use_native_downloader) -> None: - with tempfile.NamedTemporaryFile("w") as f1, tempfile.NamedTemporaryFile("w") as f2: - for f in (f1, f2): - table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) - papq.write_table(table, f.name) - f.flush() - - df = daft.read_parquet([f1.name, f2.name], use_native_downloader=use_native_downloader) + with create_temp_filename() as f1name, create_temp_filename() as f2name: + with open(f1name, "w") as f1, open(f2name, "w") as f2: + for f in (f1, f2): + table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) + papq.write_table(table, f.name) + f.flush() + + df = daft.read_parquet([f1name, f2name], use_native_downloader=use_native_downloader) assert df.column_names == COL_NAMES pd_df = df.to_pandas() @@ -697,10 +719,11 @@ def test_create_dataframe_multiple_parquets(valid_data: list[dict[str, float]], reason="requires PyRunner to be in use", ) def test_create_dataframe_parquet_custom_fs(valid_data: list[dict[str, float]]) -> None: - with tempfile.NamedTemporaryFile("w") as f: - table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) - papq.write_table(table, f.name) - f.flush() + with create_temp_filename() as fname: + with open(fname, "w") as f: + table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) + papq.write_table(table, f.name) + f.flush() # Mark that this filesystem instance shouldn't be automatically reused by fsspec; without this, # fsspec would cache this instance and reuse it for Daft's default construction of filesystems, @@ -710,7 +733,7 @@ def test_create_dataframe_parquet_custom_fs(valid_data: list[dict[str, float]]) with patch.object(fs, "info", wraps=fs.info) as mock_info, patch.object( fs, "open", wraps=fs.open ) as mock_open, patch("daft.filesystem._get_fs_from_cache", mock_cache): - df = daft.read_parquet(f.name, fs=fs) + df = daft.read_parquet(fname, fs=fs) # Check that info() is called on the passed filesystem. mock_info.assert_called() @@ -727,14 +750,15 @@ def test_create_dataframe_parquet_custom_fs(valid_data: list[dict[str, float]]) @pytest.mark.parametrize("use_native_downloader", [True, False]) def test_create_dataframe_parquet_column_projection(valid_data: list[dict[str, float]], use_native_downloader) -> None: - with tempfile.NamedTemporaryFile("w") as f: - table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) - papq.write_table(table, f.name) - f.flush() + with create_temp_filename() as fname: + with open(fname, "w") as f: + table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) + papq.write_table(table, fname) + f.flush() col_subset = COL_NAMES[:3] - df = daft.read_parquet(f.name, use_native_downloader=use_native_downloader) + df = daft.read_parquet(fname, use_native_downloader=use_native_downloader) df = df.select(*col_subset) assert df.column_names == col_subset @@ -745,13 +769,14 @@ def test_create_dataframe_parquet_column_projection(valid_data: list[dict[str, f @pytest.mark.parametrize("use_native_downloader", [True, False]) def test_create_dataframe_parquet_specify_schema(valid_data: list[dict[str, float]], use_native_downloader) -> None: - with tempfile.NamedTemporaryFile("w") as f: - table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) - papq.write_table(table, f.name) - f.flush() + with create_temp_filename() as fname: + with open(fname, "w") as f: + table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) + papq.write_table(table, fname) + f.flush() df = daft.read_parquet( - f.name, + fname, schema_hints={ "sepal_length": DataType.float64(), "sepal_width": DataType.float64(), diff --git a/tests/dataframe/test_repr.py b/tests/dataframe/test_repr.py index 92e16fccf2..5ddd9590f9 100644 --- a/tests/dataframe/test_repr.py +++ b/tests/dataframe/test_repr.py @@ -200,7 +200,7 @@ def test_repr_html_custom_hooks(): df.collect() assert ( - df.__repr__() + df.__repr__().replace("\r", "") == """+-------------------+-------------+----------------------------------+ | objects | np | pil | | Python | Python | Python | diff --git a/tests/integration/test_tpch.py b/tests/integration/test_tpch.py index 4b899da2e1..07db634275 100644 --- a/tests/integration/test_tpch.py +++ b/tests/integration/test_tpch.py @@ -2,6 +2,7 @@ import pathlib import sqlite3 +import sys import pandas as pd import pytest @@ -15,6 +16,9 @@ # Hardcode scale factor to 200M for local testing SCALE_FACTOR = 0.2 +if sys.platform == "win32": + pytest.skip(allow_module_level=True) + @pytest.fixture(scope="session", autouse=True, params=[1, 2]) def gen_tpch(request): diff --git a/tests/series/test_concat.py b/tests/series/test_concat.py index 48f7bd06e0..635e15ef72 100644 --- a/tests/series/test_concat.py +++ b/tests/series/test_concat.py @@ -99,9 +99,9 @@ def test_series_concat_tensor_array_ray(chunks) -> None: chunk_size = 3 chunk_shape = (chunk_size,) + element_shape chunks = [ - np.arange(i * chunk_size * num_elements_per_tensor, (i + 1) * chunk_size * num_elements_per_tensor).reshape( - chunk_shape - ) + np.arange( + i * chunk_size * num_elements_per_tensor, (i + 1) * chunk_size * num_elements_per_tensor, dtype=np.int64 + ).reshape(chunk_shape) for i in range(chunks) ] series = [Series.from_arrow(ArrowTensorArray.from_numpy(chunk)) for chunk in chunks] diff --git a/tests/series/test_tensor.py b/tests/series/test_tensor.py index 139f31fe57..6e9295fdff 100644 --- a/tests/series/test_tensor.py +++ b/tests/series/test_tensor.py @@ -123,7 +123,7 @@ def test_tensor_repr(): arrs = [arr, arr, None] s = Series.from_pylist(arrs, pyobj="allow") assert ( - repr(s) + repr(s).replace("\r", "") == """ +-----------------------+ | list_series | diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index 0806e62f4c..ed2588cc68 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -2,6 +2,7 @@ import contextlib import datetime +import os import pathlib import tempfile @@ -45,9 +46,10 @@ def test_read_input(tmpdir): @contextlib.contextmanager def _parquet_write_helper(data: pa.Table, row_group_size: int = None, papq_write_table_kwargs: dict = {}): - with tempfile.NamedTemporaryFile() as tmpfile: - papq.write_table(data, tmpfile.name, row_group_size=row_group_size, **papq_write_table_kwargs) - yield tmpfile.name + with tempfile.TemporaryDirectory() as directory_name: + file = os.path.join(directory_name, "tempfile") + papq.write_table(data, file, row_group_size=row_group_size, **papq_write_table_kwargs) + yield file @pytest.mark.parametrize( diff --git a/tests/test_schema.py b/tests/test_schema.py index 0ce2912973..53442c4b3e 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -64,7 +64,7 @@ def test_schema_to_name_set(): def test_repr(): schema = TABLE.schema() assert ( - repr(schema) + repr(schema).replace("\r", "") == """+-------+---------+--------+---------+ | int | float | string | bool | | Int64 | Float64 | Utf8 | Boolean | diff --git a/tests/udf_library/test_url_udfs.py b/tests/udf_library/test_url_udfs.py index 3a7e15a6e0..244d2741cd 100644 --- a/tests/udf_library/test_url_udfs.py +++ b/tests/udf_library/test_url_udfs.py @@ -1,6 +1,7 @@ from __future__ import annotations import pathlib +import sys import uuid import pandas as pd @@ -16,8 +17,9 @@ def _get_filename(): name = str(uuid.uuid4()) - # Inject colons into the name - name += ":foo:bar" + # Inject colons into the name if not windows + if sys.platform != "win32": + name += ":foo:bar" return name @@ -39,7 +41,6 @@ def test_download(files, use_native_downloader): pd_df = pd.DataFrame.from_dict({"filenames": [str(f) for f in files]}) pd_df["bytes"] = pd.Series([pathlib.Path(fn).read_bytes() for fn in files]) assert_df_equals(df.to_pandas(), pd_df, sort_key="filenames") - print(_) @pytest.mark.skipif(get_context().runner_config.name not in {"py"}, reason="requires PyRunner to be in use")