Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#4479: Prevent users from using a local filepath when performing a distributed write #4484

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/release_notes/release_notes-0.15.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Key Features and Updates
*
* Developer API enhancements
* FEAT-#4359: Add __dataframe__ method to the protocol dataframe (#4360)
* FIX-#4479: Prevent users from using a local filepath when performing a distributed write (#4484)
* Update testing suite
* TEST-#4363: Use Ray from pypi in CI (#4364)
* FIX-#4422: get rid of case sensitivity for `warns_that_defaulting_to_pandas` (#4423)
Expand Down
25 changes: 25 additions & 0 deletions modin/core/execution/ray/implementations/pandas_on_ray/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from modin.core.execution.ray.common import RayTask, SignalActor
from ..dataframe import PandasOnRayDataframe
from ..partitioning import PandasOnRayDataframePartition
from modin.core.io.utils import is_local_path


class PandasOnRayIO(RayIO):
Expand Down Expand Up @@ -165,6 +166,18 @@
if not cls._to_csv_check_support(kwargs):
return RayIO.to_csv(qc, **kwargs)

if len(ray.nodes()) > 1 and (
not isinstance(kwargs["path_or_buf"], str)
or is_local_path(kwargs["path_or_buf"])
):
from modin.error_message import ErrorMessage

Check warning on line 173 in modin/core/execution/ray/implementations/pandas_on_ray/io/io.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/io/io.py#L173

Added line #L173 was not covered by tests

ErrorMessage.single_warning(

Check warning on line 175 in modin/core/execution/ray/implementations/pandas_on_ray/io/io.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/io/io.py#L175

Added line #L175 was not covered by tests
"`path_or_buf` must point to a networked file or distributed filesystem (e.g. S3) "
+ "when in cluster mode. Defaulting to pandas for `to_csv`"
)
return RayIO.to_csv(qc, **kwargs)

Check warning on line 179 in modin/core/execution/ray/implementations/pandas_on_ray/io/io.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/io/io.py#L179

Added line #L179 was not covered by tests

signals = SignalActor.remote(len(qc._modin_frame._partitions) + 1)

def func(df, **kw):
Expand Down Expand Up @@ -277,6 +290,18 @@
if not cls._to_parquet_check_support(kwargs):
return RayIO.to_parquet(qc, **kwargs)

if len(ray.nodes()) > 1 and (
not isinstance(kwargs["path_or_buf"], str)
or is_local_path(kwargs["path_or_buf"])
):
from modin.error_message import ErrorMessage

Check warning on line 297 in modin/core/execution/ray/implementations/pandas_on_ray/io/io.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/io/io.py#L297

Added line #L297 was not covered by tests

ErrorMessage.single_warning(

Check warning on line 299 in modin/core/execution/ray/implementations/pandas_on_ray/io/io.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/io/io.py#L299

Added line #L299 was not covered by tests
"`path_or_buf` must point to a networked file or distributed filesystem (e.g. S3) "
+ "when in cluster mode. Defaulting to pandas for `to_parquet`"
)
return RayIO.to_parquet(qc, **kwargs)

Check warning on line 303 in modin/core/execution/ray/implementations/pandas_on_ray/io/io.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/io/io.py#L303

Added line #L303 was not covered by tests

def func(df, **kw):
"""
Dump a chunk of rows as parquet, then save them to target maintaining order.
Expand Down
68 changes: 68 additions & 0 deletions modin/core/io/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""Collection of utility functions for distributed io."""

import os
import re
import fsspec

IS_FILE_ONLY_REGEX = re.compile(f"[^\\{os.sep}]*\.\w+") # noqa: W605


def is_local_path(path) -> bool:
"""
Return ``True`` if the specified `path` is a local path, ``False`` otherwise.

Parameters
----------
path : str, path object or file-like object
The path to check.

Returns
-------
Whether the `path` points to a local file.

Notes
-----
If the filesystem corresponds to a `ZipFileSystem`, `TarFileSystem` or `CachingFileSystem`,
this code will return `False` even if it is local.
"""
try:
if IS_FILE_ONLY_REGEX.match(path) is not None:
# If we are passed just a filename, we will perform our check on the current working
# directory.
parent_dir = os.getcwd()
else:
# If we are passed a full path, we want to remove the filename from it.
parent_dir = os.sep.join(path.split(os.sep)[:-1])
fs = fsspec.core.url_to_fs(parent_dir)[0] # Grab just the FileSystem object
if hasattr(
fs, "local_file"
): # If the FS does not have the `local_file` attr, it is not local.
# We still need to check that it is not a mounted file - as fsspec treats mounted
# files the same as local ones, but we want to distinguish between local and mounted.
if os.name == "nt" and parent_dir[:3] == "D:\\":
# In Windows, os.path.abspath(os.sep) will give us the C Drive, but we want the
# D drive to also be marked as local.
local_device_id = os.stat("D:\\")

Check warning on line 58 in modin/core/io/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/core/io/utils.py#L58

Added line #L58 was not covered by tests
else:
local_device_id = os.stat(os.path.abspath(os.sep)).st_dev
path_device_id = os.stat(parent_dir).st_dev
return path_device_id == local_device_id
return False
except Exception:
# If an exception is raised, it means we tried to open a filesystem that requires additional
# dependencies. This means that it is definitely not a local filesystem, so we can return
# `False` here.
return False
23 changes: 23 additions & 0 deletions modin/pandas/test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from modin.utils import to_pandas
from modin.pandas.utils import from_arrow
from modin.test.test_utils import warns_that_defaulting_to_pandas
from modin.core.io.utils import is_local_path
import pyarrow as pa
import os
from scipy import sparse
Expand Down Expand Up @@ -2378,3 +2379,25 @@ def test_to_period():
)
modin_df, pandas_df = create_test_dfs(TEST_DATA, index=index)
df_equals(modin_df.to_period(), pandas_df.to_period())


def test_is_local_path():
s3_path = "s3://modin-example-bucket/modin-example-file"
assert not is_local_path(s3_path), "S3 Path incorrectly flagged as local!"
azure_blob_path = "https://modin-example-storage-account.blob.core.windows.net/modin-example-container/modin-example-file"
assert not is_local_path(
azure_blob_path
), "Azure Blob Storage Path incorrectly flagged as local!"
gcs_path = "gs://modin-example-bucket/modin-example-file"
assert not is_local_path(gcs_path), "GCS Path incorrectly flagged as local!"
assert is_local_path(
os.getcwd()
), "Current Working Directory incorrectly flagged as not local!"
new_file = os.getcwd() + "/modin-example-file.extension"
assert is_local_path(
new_file
), "Non-existent file under current working directory incorrectly flagged as not local!"
new_file_in_curr_dir = "modin-example-file.extension"
assert is_local_path(
new_file_in_curr_dir,
), "Non-existent file without absolute path incorrectly flagged as not local!"