Skip to content

Commit

Permalink
FIX-#7321: Using 'C' engine instead of 'pyarrow' for getting metadata…
Browse files Browse the repository at this point in the history
… in 'read_csv' (#7322)

Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev authored Jun 18, 2024
1 parent 35298c0 commit 08c1b11
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 16 deletions.
34 changes: 19 additions & 15 deletions modin/core/io/text/text_file_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def partitioned_file(
newline: bytes = None,
header_size: int = 0,
pre_reading: int = 0,
read_callback_kw: dict = None,
get_metadata_kw: dict = None,
):
"""
Compute chunk sizes in bytes for every partition.
Expand Down Expand Up @@ -244,7 +244,7 @@ def partitioned_file(
Number of rows, that occupied by header.
pre_reading : int, default: 0
Number of rows between header and skipped rows, that should be read.
read_callback_kw : dict, optional
get_metadata_kw : dict, optional
Keyword arguments for `cls.read_callback` to compute metadata if needed.
This option is not compatible with `pre_reading!=0`.
Expand All @@ -255,11 +255,11 @@ def partitioned_file(
int : partition start read byte
int : partition end read byte
pandas.DataFrame or None
Dataframe from which metadata can be retrieved. Can be None if `read_callback_kw=None`.
Dataframe from which metadata can be retrieved. Can be None if `get_metadata_kw=None`.
"""
if read_callback_kw is not None and pre_reading != 0:
if get_metadata_kw is not None and pre_reading != 0:
raise ValueError(
f"Incompatible combination of parameters: {read_callback_kw=}, {pre_reading=}"
f"Incompatible combination of parameters: {get_metadata_kw=}, {pre_reading=}"
)
read_rows_counter = 0
outside_quotes = True
Expand Down Expand Up @@ -297,11 +297,11 @@ def partitioned_file(
rows_skipper(skiprows)
else:
rows_skipper(skiprows)
if read_callback_kw:
if get_metadata_kw:
start = f.tell()
# For correct behavior, if we want to avoid double skipping rows,
# we need to get metadata after skipping.
pd_df_metadata = cls.read_callback(f, **read_callback_kw)
pd_df_metadata = cls.read_callback(f, **get_metadata_kw)
f.seek(start)
rows_skipper(header_size)

Expand Down Expand Up @@ -1063,28 +1063,32 @@ def _read(cls, filepath_or_buffer, **kwargs):
and (usecols is None or skiprows is None)
and pre_reading == 0
)
read_callback_kw = dict(kwargs, nrows=1, skipfooter=0, index_col=index_col)
get_metadata_kw = dict(kwargs, nrows=1, skipfooter=0, index_col=index_col)
if get_metadata_kw.get("engine", None) == "pyarrow":
# pyarrow engine doesn't support `nrows` option;
# https://github.com/pandas-dev/pandas/issues/38872 can be used to track pyarrow engine features
get_metadata_kw["engine"] = "c"
if not can_compute_metadata_while_skipping_rows:
pd_df_metadata = cls.read_callback(
filepath_or_buffer_md,
**read_callback_kw,
**get_metadata_kw,
)
column_names = pd_df_metadata.columns
column_widths, num_splits = cls._define_metadata(
pd_df_metadata, column_names
)
read_callback_kw = None
get_metadata_kw = None
else:
read_callback_kw = dict(read_callback_kw, skiprows=None)
get_metadata_kw = dict(get_metadata_kw, skiprows=None)
# `memory_map` doesn't work with file-like object so we can't use it here.
# We can definitely skip it without violating the reading logic
# since this parameter is intended to optimize reading.
# For reading a couple of lines, this is not essential.
read_callback_kw.pop("memory_map", None)
get_metadata_kw.pop("memory_map", None)
# These parameters are already used when opening file `f`,
# they do not need to be used again.
read_callback_kw.pop("storage_options", None)
read_callback_kw.pop("compression", None)
get_metadata_kw.pop("storage_options", None)
get_metadata_kw.pop("compression", None)

with OpenFile(
filepath_or_buffer_md,
Expand All @@ -1110,7 +1114,7 @@ def _read(cls, filepath_or_buffer, **kwargs):
newline=newline,
header_size=header_size,
pre_reading=pre_reading,
read_callback_kw=read_callback_kw,
get_metadata_kw=get_metadata_kw,
)
if can_compute_metadata_while_skipping_rows:
pd_df_metadata = pd_df_metadata_temp
Expand Down
2 changes: 1 addition & 1 deletion modin/tests/pandas/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ def test_read_csv_encoding_976(self, pathlike):
# Quoting, Compression parameters tests
@pytest.mark.parametrize("compression", ["infer", "gzip", "bz2", "xz", "zip"])
@pytest.mark.parametrize("encoding", [None, "latin8", "utf16"])
@pytest.mark.parametrize("engine", [None, "python", "c"])
@pytest.mark.parametrize("engine", [None, "python", "c", "pyarrow"])
def test_read_csv_compression(self, make_csv_file, compression, encoding, engine):
unique_filename = make_csv_file(encoding=encoding, compression=compression)
expected_exception = None
Expand Down

0 comments on commit 08c1b11

Please sign in to comment.