From d1224d04cb5beccb0dfd891d3221d29c3700b806 Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Wed, 19 Jun 2024 12:23:26 +0200 Subject: [PATCH] feat: Expand NDJson glob into one SCAN (#17063) --- crates/polars-lazy/src/scan/ndjson.rs | 38 +++++++++++++++++ .../src/executors/scan/ndjson.rs | 42 ++++++++++++++----- py-polars/tests/unit/io/test_lazy_json.py | 10 +++++ 3 files changed, 80 insertions(+), 10 deletions(-) diff --git a/crates/polars-lazy/src/scan/ndjson.rs b/crates/polars-lazy/src/scan/ndjson.rs index 89022901e9b11..e51f5f2217984 100644 --- a/crates/polars-lazy/src/scan/ndjson.rs +++ b/crates/polars-lazy/src/scan/ndjson.rs @@ -91,6 +91,44 @@ impl LazyJsonLineReader { } impl LazyFileListReader for LazyJsonLineReader { + fn finish(self) -> PolarsResult { + if !self.glob() { + return self.finish_no_glob(); + } + + let paths = self.expand_paths()?.0; + + let file_options = FileScanOptions { + n_rows: self.n_rows, + with_columns: None, + cache: false, + row_index: self.row_index, + rechunk: self.rechunk, + file_counter: 0, + hive_options: Default::default(), + }; + + let options = NDJsonReadOptions { + n_threads: None, + infer_schema_length: self.infer_schema_length, + chunk_size: NonZeroUsize::new(1 << 18).unwrap(), + low_memory: self.low_memory, + ignore_errors: self.ignore_errors, + schema: self.schema, + }; + + let scan_type = FileScan::NDJson { options }; + + Ok(LazyFrame::from(DslPlan::Scan { + paths, + file_info: None, + hive_parts: None, + predicate: None, + file_options, + scan_type, + })) + } + fn finish_no_glob(self) -> PolarsResult { let file_options = FileScanOptions { n_rows: self.n_rows, diff --git a/crates/polars-mem-engine/src/executors/scan/ndjson.rs b/crates/polars-mem-engine/src/executors/scan/ndjson.rs index 0750905b44e55..7111e278f69fc 100644 --- a/crates/polars-mem-engine/src/executors/scan/ndjson.rs +++ b/crates/polars-mem-engine/src/executors/scan/ndjson.rs @@ -35,28 +35,50 @@ impl JsonExec { .as_ref() .unwrap_right(); + let mut n_rows = self.file_scan_options.n_rows; + let dfs = self .paths .iter() - .map(|p| { - let df = JsonLineReader::from_path(p)? + .map_while(|p| { + if n_rows == Some(0) { + return None; + } + + let reader = match JsonLineReader::from_path(p) { + Ok(r) => r, + Err(e) => return Some(Err(e)), + }; + + let df = reader .with_schema(schema.clone()) .with_rechunk(self.file_scan_options.rechunk) .with_chunk_size(Some(self.options.chunk_size)) .low_memory(self.options.low_memory) - .with_n_rows(self.file_scan_options.n_rows) + .with_n_rows(n_rows) .with_ignore_errors(self.options.ignore_errors) - .finish()?; + .finish(); - if let Some(row_index) = &mut self.file_scan_options.row_index { - let offset = row_index.offset; - row_index.offset += df.height() as IdxSize; - df.with_row_index(row_index.name.as_ref(), Some(offset)) - } else { - Ok(df) + let df = match df { + Ok(df) => df, + Err(e) => return Some(Err(e)), + }; + + if let Some(ref mut n_rows) = n_rows { + *n_rows -= df.height(); } + + Some(match self.file_scan_options.row_index { + Some(ref mut row_index) => { + let offset = row_index.offset; + row_index.offset += df.height() as IdxSize; + df.with_row_index(row_index.name.as_ref(), Some(offset)) + }, + None => Ok(df), + }) }) .collect::>>()?; + accumulate_dataframes_vertical(dfs) } } diff --git a/py-polars/tests/unit/io/test_lazy_json.py b/py-polars/tests/unit/io/test_lazy_json.py index 253250a13d470..c3e8a7d055786 100644 --- a/py-polars/tests/unit/io/test_lazy_json.py +++ b/py-polars/tests/unit/io/test_lazy_json.py @@ -138,3 +138,13 @@ def test_ndjson_list_arg(io_files_path: Path) -> None: assert df.shape == (54, 4) assert df.row(-1) == ("seafood", 194, 12.0, 1) assert df.row(0) == ("vegetables", 45, 0.5, 2) + + +def test_glob_single_scan(io_files_path: Path) -> None: + file_path = io_files_path / "foods*.ndjson" + df = pl.scan_ndjson(file_path, n_rows=40) + + explain = df.explain() + + assert explain.count("SCAN") == 1 + assert "UNION" not in explain