From 575e91759f6a6a720cc0e1e50b97afac436a0240 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 6 May 2024 16:24:44 +0200 Subject: [PATCH 01/11] fix: Lazy csv + projection; respect null values arg (#16077) --- crates/polars-io/src/csv/read/options.rs | 11 ----------- crates/polars-io/src/csv/read/parser.rs | 2 +- crates/polars-io/src/csv/read/read_impl.rs | 10 ++-------- py-polars/tests/unit/io/test_lazy_csv.py | 23 ++++++++++++++++++++++ 4 files changed, 26 insertions(+), 20 deletions(-) diff --git a/crates/polars-io/src/csv/read/options.rs b/crates/polars-io/src/csv/read/options.rs index 3741bd6d9e47..2764d085d093 100644 --- a/crates/polars-io/src/csv/read/options.rs +++ b/crates/polars-io/src/csv/read/options.rs @@ -133,17 +133,6 @@ pub(super) enum NullValuesCompiled { } impl NullValuesCompiled { - pub(super) fn apply_projection(&mut self, projections: &[usize]) { - if let Self::Columns(nv) = self { - let nv = projections - .iter() - .map(|i| std::mem::take(&mut nv[*i])) - .collect::>(); - - *self = NullValuesCompiled::Columns(nv); - } - } - /// # Safety /// /// The caller must ensure that `index` is in bounds diff --git a/crates/polars-io/src/csv/read/parser.rs b/crates/polars-io/src/csv/read/parser.rs index 29845c990621..eff02dd4f19e 100644 --- a/crates/polars-io/src/csv/read/parser.rs +++ b/crates/polars-io/src/csv/read/parser.rs @@ -511,7 +511,7 @@ pub(super) fn parse_lines( // SAFETY: // process fields is in bounds - add_null = unsafe { null_values.is_null(field, processed_fields) } + add_null = unsafe { null_values.is_null(field, idx as usize) } } if add_null { buf.add_null(!missing_is_null && field.is_empty()) diff --git a/crates/polars-io/src/csv/read/read_impl.rs b/crates/polars-io/src/csv/read/read_impl.rs index a44c2ba8c90e..5805d1898fdc 100644 --- a/crates/polars-io/src/csv/read/read_impl.rs +++ b/crates/polars-io/src/csv/read/read_impl.rs @@ -224,8 +224,8 @@ impl<'a> CoreReader<'a> { } } - // create a null value for every column - let mut null_values = null_values.map(|nv| nv.compile(&schema)).transpose()?; + // Create a null value for every column + let null_values = null_values.map(|nv| nv.compile(&schema)).transpose()?; if let Some(cols) = columns { let mut prj = Vec::with_capacity(cols.len()); @@ -233,12 +233,6 @@ impl<'a> CoreReader<'a> { let i = schema.try_index_of(&col)?; prj.push(i); } - - // update null values with projection - if let Some(nv) = null_values.as_mut() { - nv.apply_projection(&prj); - } - projection = Some(prj); } diff --git a/py-polars/tests/unit/io/test_lazy_csv.py b/py-polars/tests/unit/io/test_lazy_csv.py index 59bb84d72658..59e7291ea522 100644 --- a/py-polars/tests/unit/io/test_lazy_csv.py +++ b/py-polars/tests/unit/io/test_lazy_csv.py @@ -1,5 +1,6 @@ from __future__ import annotations +import tempfile from collections import OrderedDict from typing import TYPE_CHECKING @@ -285,3 +286,25 @@ def test_scan_empty_csv_with_row_index(tmp_path: Path) -> None: read = pl.scan_csv(file_path).with_row_index("idx") assert read.collect().schema == OrderedDict([("idx", pl.UInt32), ("a", pl.String)]) + + +@pytest.mark.write_disk() +def test_csv_null_values_with_projection_15515() -> None: + data = """IndCode,SireCode,BirthDate,Flag +ID00316,.,19940315, +""" + + with tempfile.NamedTemporaryFile() as f: + f.write(data.encode()) + f.seek(0) + + q = ( + pl.scan_csv(f.name, null_values={"SireCode": "."}) + .with_columns(pl.col("SireCode").alias("SireKey")) + .select("SireKey", "BirthDate") + ) + + assert q.collect().to_dict(as_series=False) == { + "SireKey": [None], + "BirthDate": [19940315], + } From dd6e2ee86b0db07d89a256b1c8a2e1009e3f51f2 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 6 May 2024 18:18:06 +0200 Subject: [PATCH 02/11] fix: Respect user passed 'reader_schema' in 'scan_csv' (#16080) --- Cargo.lock | 4 +++ Cargo.toml | 2 +- .../src/physical_plan/executors/scan/csv.rs | 6 +++-- .../physical_plan/executors/scan/parquet.rs | 18 ++++++++++--- .../src/physical_plan/planner/lp.rs | 2 +- .../src/executors/sources/parquet.rs | 9 +++++-- crates/polars-plan/Cargo.toml | 2 ++ .../src/logical_plan/conversion/scans.rs | 24 ++++++++++++------ crates/polars-plan/src/logical_plan/schema.rs | 5 ++-- py-polars/tests/unit/io/test_lazy_csv.py | 25 +++++++++++++++++++ 10 files changed, 79 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d4332d87aa02..4bb623113671 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1287,6 +1287,9 @@ name = "either" version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2" +dependencies = [ + "serde", +] [[package]] name = "elliptic-curve" @@ -3079,6 +3082,7 @@ dependencies = [ "chrono", "chrono-tz", "ciborium", + "either", "futures", "hashbrown 0.14.3", "libloading", diff --git a/Cargo.toml b/Cargo.toml index ab0e1535e01f..3887ac8f39ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,7 @@ chrono-tz = "0.8.1" ciborium = "0.2" crossbeam-channel = "0.5.8" crossbeam-queue = "0.3" -either = "1.9" +either = "1.11" ethnum = "1.3.2" fallible-streaming-iterator = "0.1.9" futures = "0.3.25" diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs b/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs index 06277d5b054e..69a8df57c41c 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs @@ -4,7 +4,7 @@ use super::*; pub struct CsvExec { pub path: PathBuf, - pub schema: SchemaRef, + pub file_info: FileInfo, pub options: CsvReaderOptions, pub file_options: FileScanOptions, pub predicate: Option>, @@ -26,7 +26,9 @@ impl CsvExec { CsvReader::from_path(&self.path) .unwrap() .has_header(self.options.has_header) - .with_dtypes(Some(self.schema.clone())) + .with_schema(Some( + self.file_info.reader_schema.clone().unwrap().unwrap_right(), + )) .with_separator(self.options.separator) .with_ignore_errors(self.options.ignore_errors) .with_skip_rows(self.options.skip_rows) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs index 429a87574c1f..695cc58c98c0 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -91,7 +91,12 @@ impl ParquetExec { ); let mut reader = ParquetReader::new(file) - .with_schema(self.file_info.reader_schema.clone()) + .with_schema( + self.file_info + .reader_schema + .clone() + .map(|either| either.unwrap_left()), + ) .read_parallel(parallel) .set_low_memory(self.options.low_memory) .use_statistics(self.options.use_statistics) @@ -163,7 +168,9 @@ impl ParquetExec { .file_info .reader_schema .as_ref() - .expect("should be set"); + .expect("should be set") + .as_ref() + .unwrap_left(); let first_metadata = &self.metadata; let cloud_options = self.cloud_options.as_ref(); let with_columns = self @@ -343,7 +350,12 @@ impl ParquetExec { ); return Ok(materialize_empty_df( projection.as_deref(), - self.file_info.reader_schema.as_ref().unwrap(), + self.file_info + .reader_schema + .as_ref() + .unwrap() + .as_ref() + .unwrap_left(), hive_partitions.as_deref(), self.file_options.row_index.as_ref(), )); diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index 85260425a844..da98d7aa0216 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -239,7 +239,7 @@ pub fn create_physical_plan( let path = paths[0].clone(); Ok(Box::new(executors::CsvExec { path, - schema: file_info.schema, + file_info, options: csv_options, predicate, file_options, diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index b94028980ede..79e7380c6291 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -113,7 +113,7 @@ impl ParquetSource { file_options, projection, chunk_size, - reader_schema, + reader_schema.map(|either| either.unwrap_left()), hive_partitions, )) } @@ -151,7 +151,12 @@ impl ParquetSource { .map(|v| v.as_slice()); check_projected_arrow_schema( batched_reader.schema().as_ref(), - self.file_info.reader_schema.as_ref().unwrap(), + self.file_info + .reader_schema + .as_ref() + .unwrap() + .as_ref() + .unwrap_left(), with_columns, "schema of all files in a single scan_parquet must be equal", )?; diff --git a/crates/polars-plan/Cargo.toml b/crates/polars-plan/Cargo.toml index 18279f58fe00..ee6d0a2d43ee 100644 --- a/crates/polars-plan/Cargo.toml +++ b/crates/polars-plan/Cargo.toml @@ -28,6 +28,7 @@ bytemuck = { workspace = true } chrono = { workspace = true, optional = true } chrono-tz = { workspace = true, optional = true } ciborium = { workspace = true, optional = true } +either = { workspace = true } futures = { workspace = true, optional = true } hashbrown = { workspace = true } once_cell = { workspace = true } @@ -53,6 +54,7 @@ serde = [ "polars-time/serde", "polars-io/serde", "polars-ops/serde", + "either/serde", ] streaming = [] parquet = ["polars-io/parquet", "polars-parquet"] diff --git a/crates/polars-plan/src/logical_plan/conversion/scans.rs b/crates/polars-plan/src/logical_plan/conversion/scans.rs index 84139ff5e713..7d03fa9b7d56 100644 --- a/crates/polars-plan/src/logical_plan/conversion/scans.rs +++ b/crates/polars-plan/src/logical_plan/conversion/scans.rs @@ -1,6 +1,7 @@ use std::io::Read; use std::path::PathBuf; +use either::Either; #[cfg(feature = "cloud")] use polars_io::pl_async::get_runtime; use polars_io::prelude::*; @@ -66,7 +67,7 @@ pub(super) fn parquet_file_info( let mut file_info = FileInfo::new( schema, - Some(reader_schema), + Some(Either::Left(reader_schema)), (num_rows, num_rows.unwrap_or(0)), ); @@ -110,7 +111,7 @@ pub(super) fn ipc_file_info( metadata.schema.as_ref().into(), file_options.row_index.as_ref(), ), - Some(Arc::clone(&metadata.schema)), + Some(Either::Left(Arc::clone(&metadata.schema))), (None, 0), ); @@ -171,14 +172,23 @@ pub(super) fn csv_file_info( .clone() .unwrap_or_else(|| Arc::new(inferred_schema)); - if let Some(rc) = &file_options.row_index { - let schema = Arc::make_mut(&mut schema); - schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE)?; - } + let reader_schema = if let Some(rc) = &file_options.row_index { + let reader_schema = schema.clone(); + let mut output_schema = (*reader_schema).clone(); + output_schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE)?; + schema = Arc::new(output_schema); + reader_schema + } else { + schema.clone() + }; let n_bytes = reader_bytes.len(); let estimated_n_rows = (rows_read as f64 / bytes_read as f64 * n_bytes as f64) as usize; csv_options.skip_rows += csv_options.skip_rows_after_header; - Ok(FileInfo::new(schema, None, (None, estimated_n_rows))) + Ok(FileInfo::new( + schema, + Some(Either::Right(reader_schema)), + (None, estimated_n_rows), + )) } diff --git a/crates/polars-plan/src/logical_plan/schema.rs b/crates/polars-plan/src/logical_plan/schema.rs index 6c4629a80cb0..2ee480c9727b 100644 --- a/crates/polars-plan/src/logical_plan/schema.rs +++ b/crates/polars-plan/src/logical_plan/schema.rs @@ -3,6 +3,7 @@ use std::path::Path; use std::sync::Mutex; use arrow::datatypes::ArrowSchemaRef; +use either::Either; use polars_core::prelude::*; use polars_utils::format_smartstring; #[cfg(feature = "serde")] @@ -43,7 +44,7 @@ pub struct FileInfo { pub schema: SchemaRef, /// Stores the schema used for the reader, as the main schema can contain /// extra hive columns. - pub reader_schema: Option, + pub reader_schema: Option>, /// - known size /// - estimated size pub row_estimation: (Option, usize), @@ -54,7 +55,7 @@ impl FileInfo { /// Constructs a new [`FileInfo`]. pub fn new( schema: SchemaRef, - reader_schema: Option, + reader_schema: Option>, row_estimation: (Option, usize), ) -> Self { Self { diff --git a/py-polars/tests/unit/io/test_lazy_csv.py b/py-polars/tests/unit/io/test_lazy_csv.py index 59e7291ea522..0079f6ad43ac 100644 --- a/py-polars/tests/unit/io/test_lazy_csv.py +++ b/py-polars/tests/unit/io/test_lazy_csv.py @@ -308,3 +308,28 @@ def test_csv_null_values_with_projection_15515() -> None: "SireKey": [None], "BirthDate": [19940315], } + + +@pytest.mark.write_disk() +def test_csv_respect_user_schema_ragged_lines_15254() -> None: + with tempfile.NamedTemporaryFile() as f: + f.write( + b""" +A,B,C +1,2,3 +4,5,6,7,8 +9,10,11 +""".strip() + ) + f.seek(0) + + df = pl.scan_csv( + f.name, schema=dict.fromkeys("ABCDE", pl.String), truncate_ragged_lines=True + ).collect() + assert df.to_dict(as_series=False) == { + "A": ["1", "4", "9"], + "B": ["2", "5", "10"], + "C": ["3", "6", "11"], + "D": [None, "7", None], + "E": [None, "8", None], + } From d1f3ef41b7fde800b3b54f11cd4deddb45b9dd27 Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Mon, 6 May 2024 19:57:56 +0200 Subject: [PATCH 03/11] ci: Fix failures in test coverage workflow (#16083) --- codecov.yml => .github/codecov.yml | 1 - .github/workflows/test-coverage.yml | 59 ++++++++++++++++++----------- .github/workflows/test-python.yml | 9 +---- 3 files changed, 39 insertions(+), 30 deletions(-) rename codecov.yml => .github/codecov.yml (97%) diff --git a/codecov.yml b/.github/codecov.yml similarity index 97% rename from codecov.yml rename to .github/codecov.yml index e41ea517aa75..6b0e3b4c274c 100644 --- a/codecov.yml +++ b/.github/codecov.yml @@ -4,7 +4,6 @@ coverage: patch: off comment: require_changes: true - after_n_builds: 3 ignore: - crates/polars-arrow/src/io/flight/*.rs - crates/polars-arrow/src/io/ipc/append/*.rs diff --git a/.github/workflows/test-coverage.yml b/.github/workflows/test-coverage.yml index c8dc051cb18e..d7977c2d9d8e 100644 --- a/.github/workflows/test-coverage.yml +++ b/.github/workflows/test-coverage.yml @@ -76,13 +76,10 @@ jobs: run: cargo llvm-cov report --lcov --output-path coverage-rust.lcov - name: Upload coverage report - uses: codecov/codecov-action@v4 + uses: actions/upload-artifact@v4 with: - token: ${{ secrets.CODECOV_TOKEN }} - files: coverage-rust.lcov - root_dir: ${{ github.workspace }} - flags: rust - fail_ci_if_error: true + name: coverage-rust + path: coverage-rust.lcov coverage-python: # Running under ubuntu doesn't seem to work: @@ -94,7 +91,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: '3.11' + python-version: '3.12' - name: Create virtual environment run: | @@ -126,33 +123,51 @@ jobs: - name: Run Python tests working-directory: py-polars - run: pytest --cov -n auto --dist loadgroup -m "not release and not benchmark and not docs" --cov-report xml:main.xml - continue-on-error: true + run: > + pytest + -n auto --dist loadgroup + -m "not release and not benchmark and not docs" + -k 'not test_polars_import' + --cov --cov-report xml:main.xml - name: Run Python tests - async reader working-directory: py-polars env: POLARS_FORCE_ASYNC: 1 - run: pytest --cov -n auto --dist loadgroup -m "not release and not benchmark and not docs" tests/unit/io/ --cov-report xml:async.xml - continue-on-error: true + run: > + pytest tests/unit/io/ + -n auto --dist loadgroup + -m "not release and not benchmark and not docs" + --cov --cov-report xml:async.xml --cov-fail-under=0 - name: Report Rust coverage run: cargo llvm-cov report --lcov --output-path coverage-python.lcov - - name: Upload coverage reports - Python - uses: codecov/codecov-action@v4 + - name: Upload coverage reports + uses: actions/upload-artifact@v4 with: - token: ${{ secrets.CODECOV_TOKEN }} - files: py-polars/main.xml,py-polars/async.xml - root_dir: ${{ github.workspace }} - flags: python - fail_ci_if_error: true + name: coverage-python + path: | + coverage-python.lcov + py-polars/main.xml + py-polars/async.xml + + upload-coverage: + needs: [coverage-rust, coverage-python] + runs-on: ubuntu-latest + + steps: + # Needed to fetch the Codecov config file + - uses: actions/checkout@v4 + + - name: Download coverage reports + uses: actions/download-artifact@v4 + with: + merge-multiple: true - - name: Upload coverage report - Rust + - name: Upload coverage reports uses: codecov/codecov-action@v4 with: token: ${{ secrets.CODECOV_TOKEN }} - files: coverage-python.lcov + files: coverage-rust.lcov,coverage-python.lcov,py-polars/main.xml,py-polars/async.xml root_dir: ${{ github.workspace }} - flags: rust - fail_ci_if_error: true diff --git a/.github/workflows/test-python.yml b/.github/workflows/test-python.yml index d7e21d61e6e7..10a4adc54615 100644 --- a/.github/workflows/test-python.yml +++ b/.github/workflows/test-python.yml @@ -85,14 +85,9 @@ jobs: python tests/docs/run_doctest.py pytest tests/docs/test_user_guide.py -m docs - - name: Run tests and report coverage + - name: Run tests if: github.ref_name != 'main' - env: - # TODO: Re-enable coverage for for Ubuntu + Python 3.12 tests - # Currently skipped due to performance issues in coverage: - # https://github.com/nedbat/coveragepy/issues/1665 - COV: ${{ !(matrix.os == 'ubuntu-latest' && matrix.python-version == '3.12') && '--cov' || '--no-cov' }} - run: pytest $COV -n auto --dist loadgroup -m "not release and not benchmark and not docs" + run: pytest -n auto --dist loadgroup -m "not release and not benchmark and not docs" - name: Run tests async reader tests if: github.ref_name != 'main' && matrix.os != 'windows-latest' From 6ded6f03c85305b5187b7acc21039abae8aaa3e6 Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Mon, 6 May 2024 21:22:16 +0200 Subject: [PATCH 04/11] ci: Bump `sccache` action (#16088) --- .github/workflows/benchmark.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 612adf28f125..6139428e6bc2 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -60,7 +60,7 @@ jobs: save-if: ${{ github.ref_name == 'main' }} - name: Run sccache-cache - uses: mozilla-actions/sccache-action@v0.0.3 + uses: mozilla-actions/sccache-action@v0.0.4 - name: Install Polars release build env: From c5489acc7166e2e908f3c64c47b640752f890aeb Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Tue, 7 May 2024 08:55:42 +0200 Subject: [PATCH 05/11] perf: Don't traverse deep datasets that we repr as union in CSE (#16096) --- .../src/logical_plan/optimizer/cse/cse_lp.rs | 34 +++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/crates/polars-plan/src/logical_plan/optimizer/cse/cse_lp.rs b/crates/polars-plan/src/logical_plan/optimizer/cse/cse_lp.rs index 1316d299cfd2..cde7a0dea710 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/cse/cse_lp.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/cse/cse_lp.rs @@ -174,21 +174,37 @@ impl LpIdentifierVisitor<'_> { } } +fn skip_children(lp: &IR) -> bool { + match lp { + // Don't visit all the files in a `scan *` operation. + // Put an arbitrary limit to 20 files now. + IR::Union { + options, inputs, .. + } => options.from_partitioned_ds && inputs.len() > 20, + _ => false, + } +} + impl<'a> Visitor for LpIdentifierVisitor<'a> { type Node = IRNode; type Arena = IRNodeArena; fn pre_visit( &mut self, - _node: &Self::Node, - _arena: &Self::Arena, + node: &Self::Node, + arena: &Self::Arena, ) -> PolarsResult { self.visit_stack .push(VisitRecord::Entered(self.pre_visit_idx)); self.pre_visit_idx += 1; self.identifier_array.push((0, Identifier::new())); - Ok(VisitRecursion::Continue) + + if skip_children(node.to_alp(&arena.0)) { + Ok(VisitRecursion::Skip) + } else { + Ok(VisitRecursion::Continue) + } } fn post_visit( @@ -256,7 +272,7 @@ impl<'a> RewritingVisitor for CommonSubPlanRewriter<'a> { fn pre_visit( &mut self, - _lp_node: &Self::Node, + lp_node: &Self::Node, arena: &mut Self::Arena, ) -> PolarsResult { if self.visited_idx >= self.identifier_array.len() @@ -270,7 +286,7 @@ impl<'a> RewritingVisitor for CommonSubPlanRewriter<'a> { // Id placeholder not overwritten, so we can skip this sub-expression. if !id.is_valid() { self.visited_idx += 1; - return Ok(RewriteRecursion::MutateAndContinue); + return Ok(RewriteRecursion::NoMutateAndContinue); } let Some((_, count)) = self.sp_count.get(id, &arena.0, &arena.1) else { @@ -281,7 +297,13 @@ impl<'a> RewritingVisitor for CommonSubPlanRewriter<'a> { if *count > 1 { // Rewrite this sub-plan, don't visit its children Ok(RewriteRecursion::MutateAndStop) - } else { + } + // Never mutate if count <= 1. The post-visit will search for the node, and not be able to find it + else { + // Don't traverse the children. + if skip_children(lp_node.to_alp(&arena.0)) { + return Ok(RewriteRecursion::Stop); + } // This is a unique plan // visit its children to see if they are cse self.visited_idx += 1; From 2970c5706a5bbc39840ca5b0ffc9d4b3aab502cb Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Tue, 7 May 2024 08:56:00 +0200 Subject: [PATCH 06/11] python Polars 0.20.24 (#16089) --- Cargo.lock | 2 +- py-polars/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4bb623113671..528322bc0fff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3249,7 +3249,7 @@ dependencies = [ [[package]] name = "py-polars" -version = "0.20.23" +version = "0.20.24" dependencies = [ "ahash", "arboard", diff --git a/py-polars/Cargo.toml b/py-polars/Cargo.toml index 97fc718755b7..d296c96c7c11 100644 --- a/py-polars/Cargo.toml +++ b/py-polars/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "py-polars" -version = "0.20.23" +version = "0.20.24" edition = "2021" [lib] From acb601d6ce0608e72cae22b1b24ea275e743173b Mon Sep 17 00:00:00 2001 From: wsyxbcl Date: Tue, 7 May 2024 23:31:39 +0800 Subject: [PATCH 07/11] fix(python): Improve error handling of `ParameterCollisionError` in `read_excel` (#16100) --- py-polars/polars/io/spreadsheet/functions.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/py-polars/polars/io/spreadsheet/functions.py b/py-polars/polars/io/spreadsheet/functions.py index 2a77037eb588..2947129fd175 100644 --- a/py-polars/polars/io/spreadsheet/functions.py +++ b/py-polars/polars/io/spreadsheet/functions.py @@ -503,13 +503,17 @@ def _read_spreadsheet( # normalise some top-level parameters to 'read_options' entries if engine == "calamine": - if "schema_sample_rows" in read_options: + if ("schema_sample_rows" in read_options) and ( + infer_schema_length != N_INFER_DEFAULT + ): msg = 'cannot specify both `infer_schema_length` and `read_options["schema_sample_rows"]`' raise ParameterCollisionError(msg) read_options["schema_sample_rows"] = infer_schema_length elif engine == "xlsx2csv": - if "infer_schema_length" in read_options: + if ("infer_schema_length" in read_options) and ( + infer_schema_length != N_INFER_DEFAULT + ): msg = 'cannot specify both `infer_schema_length` and `read_options["infer_schema_length"]`' raise ParameterCollisionError(msg) read_options["infer_schema_length"] = infer_schema_length From 5996d1ec0bf83499736d36ce5a34cb119c8afacb Mon Sep 17 00:00:00 2001 From: nameexhaustion Date: Wed, 8 May 2024 18:29:57 +1000 Subject: [PATCH 08/11] Revert "feat(rust): Add RLE to `RLE_DICTIONARY` encoder" (#16113) --- .../src/compute/cast/binary_to.rs | 1 - .../src/compute/cast/binview_to.rs | 2 - .../src/compute/cast/primitive_to.rs | 1 - .../polars-arrow/src/compute/cast/utf8_to.rs | 1 - crates/polars-io/src/parquet/write/writer.rs | 2 +- .../src/arrow/write/dictionary.rs | 41 ++- crates/polars-parquet/src/arrow/write/mod.rs | 2 +- .../src/arrow/write/nested/mod.rs | 10 +- .../polars-parquet/src/arrow/write/utils.rs | 6 +- .../parquet/encoding/hybrid_rle/encoder.rs | 289 ++++++------------ .../src/parquet/encoding/hybrid_rle/mod.rs | 4 +- .../tests/it/io/parquet/write/binary.rs | 4 +- .../tests/it/io/parquet/write/primitive.rs | 4 +- py-polars/tests/unit/io/test_parquet.py | 43 --- 14 files changed, 136 insertions(+), 274 deletions(-) diff --git a/crates/polars-arrow/src/compute/cast/binary_to.rs b/crates/polars-arrow/src/compute/cast/binary_to.rs index d5e8bfb30852..c7970fe6a051 100644 --- a/crates/polars-arrow/src/compute/cast/binary_to.rs +++ b/crates/polars-arrow/src/compute/cast/binary_to.rs @@ -139,7 +139,6 @@ pub fn binary_to_dictionary( from: &BinaryArray, ) -> PolarsResult> { let mut array = MutableDictionaryArray::>::new(); - array.reserve(from.len()); array.try_extend(from.iter())?; Ok(array.into()) diff --git a/crates/polars-arrow/src/compute/cast/binview_to.rs b/crates/polars-arrow/src/compute/cast/binview_to.rs index 1c157110ec49..8c7ef4c2453a 100644 --- a/crates/polars-arrow/src/compute/cast/binview_to.rs +++ b/crates/polars-arrow/src/compute/cast/binview_to.rs @@ -21,7 +21,6 @@ pub(super) fn binview_to_dictionary( from: &BinaryViewArray, ) -> PolarsResult> { let mut array = MutableDictionaryArray::>::new(); - array.reserve(from.len()); array.try_extend(from.iter())?; Ok(array.into()) @@ -31,7 +30,6 @@ pub(super) fn utf8view_to_dictionary( from: &Utf8ViewArray, ) -> PolarsResult> { let mut array = MutableDictionaryArray::>::new(); - array.reserve(from.len()); array.try_extend(from.iter())?; Ok(array.into()) diff --git a/crates/polars-arrow/src/compute/cast/primitive_to.rs b/crates/polars-arrow/src/compute/cast/primitive_to.rs index 583b6ab19a96..d0d2056b70de 100644 --- a/crates/polars-arrow/src/compute/cast/primitive_to.rs +++ b/crates/polars-arrow/src/compute/cast/primitive_to.rs @@ -318,7 +318,6 @@ pub fn primitive_to_dictionary( let mut array = MutableDictionaryArray::::try_empty(MutablePrimitiveArray::::from( from.data_type().clone(), ))?; - array.reserve(from.len()); array.try_extend(iter)?; Ok(array.into()) diff --git a/crates/polars-arrow/src/compute/cast/utf8_to.rs b/crates/polars-arrow/src/compute/cast/utf8_to.rs index 85b478c43817..4df2876d394e 100644 --- a/crates/polars-arrow/src/compute/cast/utf8_to.rs +++ b/crates/polars-arrow/src/compute/cast/utf8_to.rs @@ -27,7 +27,6 @@ pub fn utf8_to_dictionary( from: &Utf8Array, ) -> PolarsResult> { let mut array = MutableDictionaryArray::>::new(); - array.reserve(from.len()); array.try_extend(from.iter())?; Ok(array.into()) diff --git a/crates/polars-io/src/parquet/write/writer.rs b/crates/polars-io/src/parquet/write/writer.rs index 620ac11c3351..2408d66e9ba2 100644 --- a/crates/polars-io/src/parquet/write/writer.rs +++ b/crates/polars-io/src/parquet/write/writer.rs @@ -102,7 +102,7 @@ where WriteOptions { write_statistics: self.statistics, compression: self.compression, - version: Version::V1, + version: Version::V2, data_pagesize_limit: self.data_page_size, } } diff --git a/crates/polars-parquet/src/arrow/write/dictionary.rs b/crates/polars-parquet/src/arrow/write/dictionary.rs index 0525578589eb..b3ea666865c9 100644 --- a/crates/polars-parquet/src/arrow/write/dictionary.rs +++ b/crates/polars-parquet/src/arrow/write/dictionary.rs @@ -1,6 +1,7 @@ use arrow::array::{Array, BinaryViewArray, DictionaryArray, DictionaryKey, Utf8ViewArray}; use arrow::bitmap::{Bitmap, MutableBitmap}; use arrow::datatypes::{ArrowDataType, IntegerType}; +use num_traits::ToPrimitive; use polars_error::{polars_bail, PolarsResult}; use super::binary::{ @@ -15,19 +16,23 @@ use super::primitive::{ use super::{binview, nested, Nested, WriteOptions}; use crate::arrow::read::schema::is_nullable; use crate::arrow::write::{slice_nested_leaf, utils}; -use crate::parquet::encoding::hybrid_rle::encode; +use crate::parquet::encoding::hybrid_rle::encode_u32; use crate::parquet::encoding::Encoding; use crate::parquet::page::{DictPage, Page}; use crate::parquet::schema::types::PrimitiveType; use crate::parquet::statistics::{serialize_statistics, ParquetStatistics}; -use crate::write::DynIter; +use crate::write::{to_nested, DynIter, ParquetType}; pub(crate) fn encode_as_dictionary_optional( array: &dyn Array, - nested: &[Nested], type_: PrimitiveType, options: WriteOptions, ) -> Option>>> { + let nested = to_nested(array, &ParquetType::PrimitiveType(type_.clone())) + .ok()? + .pop() + .unwrap(); + let dtype = Box::new(array.data_type().clone()); let len_before = array.len(); @@ -47,11 +52,35 @@ pub(crate) fn encode_as_dictionary_optional( if (array.values().len() as f64) / (len_before as f64) > 0.75 { return None; } + if array.values().len().to_u16().is_some() { + let array = arrow::compute::cast::cast( + array, + &ArrowDataType::Dictionary( + IntegerType::UInt16, + Box::new(array.values().data_type().clone()), + false, + ), + Default::default(), + ) + .unwrap(); + + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + return Some(array_to_pages( + array, + type_, + &nested, + options, + Encoding::RleDictionary, + )); + } Some(array_to_pages( array, type_, - nested, + &nested, options, Encoding::RleDictionary, )) @@ -87,7 +116,7 @@ fn serialize_keys_values( buffer.push(num_bits as u8); // followed by the encoded indices. - Ok(encode::(buffer, keys, num_bits)?) + Ok(encode_u32(buffer, keys, num_bits)?) } else { let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64); @@ -95,7 +124,7 @@ fn serialize_keys_values( buffer.push(num_bits as u8); // followed by the encoded indices. - Ok(encode::(buffer, keys, num_bits)?) + Ok(encode_u32(buffer, keys, num_bits)?) } } diff --git a/crates/polars-parquet/src/arrow/write/mod.rs b/crates/polars-parquet/src/arrow/write/mod.rs index 65e03cecaae4..a980177c4835 100644 --- a/crates/polars-parquet/src/arrow/write/mod.rs +++ b/crates/polars-parquet/src/arrow/write/mod.rs @@ -219,7 +219,7 @@ pub fn array_to_pages( // Only take this path for primitive columns if matches!(nested.first(), Some(Nested::Primitive(_, _, _))) { if let Some(result) = - encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options) + encode_as_dictionary_optional(primitive_array, type_.clone(), options) { return result; } diff --git a/crates/polars-parquet/src/arrow/write/nested/mod.rs b/crates/polars-parquet/src/arrow/write/nested/mod.rs index 9aed392a06ee..46e15eec6c72 100644 --- a/crates/polars-parquet/src/arrow/write/nested/mod.rs +++ b/crates/polars-parquet/src/arrow/write/nested/mod.rs @@ -6,7 +6,7 @@ use polars_error::PolarsResult; pub use rep::num_values; use super::Nested; -use crate::parquet::encoding::hybrid_rle::encode; +use crate::parquet::encoding::hybrid_rle::encode_u32; use crate::parquet::read::levels::get_bit_width; use crate::parquet::write::Version; @@ -41,12 +41,12 @@ fn write_rep_levels(buffer: &mut Vec, nested: &[Nested], version: Version) - match version { Version::V1 => { write_levels_v1(buffer, |buffer: &mut Vec| { - encode::(buffer, levels, num_bits)?; + encode_u32(buffer, levels, num_bits)?; Ok(()) })?; }, Version::V2 => { - encode::(buffer, levels, num_bits)?; + encode_u32(buffer, levels, num_bits)?; }, } @@ -65,10 +65,10 @@ fn write_def_levels(buffer: &mut Vec, nested: &[Nested], version: Version) - match version { Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec| { - encode::(buffer, levels, num_bits)?; + encode_u32(buffer, levels, num_bits)?; Ok(()) }), - Version::V2 => Ok(encode::(buffer, levels, num_bits)?), + Version::V2 => Ok(encode_u32(buffer, levels, num_bits)?), } } diff --git a/crates/polars-parquet/src/arrow/write/utils.rs b/crates/polars-parquet/src/arrow/write/utils.rs index 0ba9f4289bab..2032029b2de4 100644 --- a/crates/polars-parquet/src/arrow/write/utils.rs +++ b/crates/polars-parquet/src/arrow/write/utils.rs @@ -4,7 +4,7 @@ use polars_error::*; use super::{Version, WriteOptions}; use crate::parquet::compression::CompressionOptions; -use crate::parquet::encoding::hybrid_rle::encode; +use crate::parquet::encoding::hybrid_rle::encode_bool; use crate::parquet::encoding::Encoding; use crate::parquet::metadata::Descriptor; use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2}; @@ -14,7 +14,7 @@ use crate::parquet::statistics::ParquetStatistics; fn encode_iter_v1>(buffer: &mut Vec, iter: I) -> PolarsResult<()> { buffer.extend_from_slice(&[0; 4]); let start = buffer.len(); - encode::(buffer, iter, 1)?; + encode_bool(buffer, iter)?; let end = buffer.len(); let length = end - start; @@ -25,7 +25,7 @@ fn encode_iter_v1>(buffer: &mut Vec, iter: I) -> Po } fn encode_iter_v2>(writer: &mut Vec, iter: I) -> PolarsResult<()> { - Ok(encode::(writer, iter, 1)?) + Ok(encode_bool(writer, iter)?) } fn encode_iter>( diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs index 963499cf324f..1c4dd67ccec7 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs @@ -3,216 +3,98 @@ use std::io::Write; use super::bitpacked_encode; use crate::parquet::encoding::{bitpacked, ceil8, uleb128}; -// Arbitrary value that balances memory usage and storage overhead -const MAX_VALUES_PER_LITERAL_RUN: usize = (1 << 10) * 8; - -pub trait Encoder { - fn bitpacked_encode>( - writer: &mut W, - iterator: I, - num_bits: usize, - ) -> std::io::Result<()>; - - fn run_length_encode( - writer: &mut W, - run_length: usize, - value: T, - bit_width: u32, - ) -> std::io::Result<()>; -} +/// RLE-hybrid encoding of `u32`. This currently only yields bitpacked values. +pub fn encode_u32>( + writer: &mut W, + iterator: I, + num_bits: u32, +) -> std::io::Result<()> { + let num_bits = num_bits as u8; + // the length of the iterator. + let length = iterator.size_hint().1.unwrap(); -const U32_BLOCK_LEN: usize = 32; + // write the length + indicator + let mut header = ceil8(length) as u64; + header <<= 1; + header |= 1; // it is bitpacked => first bit is set + let mut container = [0; 10]; + let used = uleb128::encode(header, &mut container); + writer.write_all(&container[..used])?; -impl Encoder for u32 { - fn bitpacked_encode>( - writer: &mut W, - mut iterator: I, - num_bits: usize, - ) -> std::io::Result<()> { - // the length of the iterator. - let length = iterator.size_hint().1.unwrap(); - - let mut header = ceil8(length) as u64; - header <<= 1; - header |= 1; // it is bitpacked => first bit is set - let mut container = [0; 10]; - let used = uleb128::encode(header, &mut container); - writer.write_all(&container[..used])?; - - let chunks = length / U32_BLOCK_LEN; - let remainder = length - chunks * U32_BLOCK_LEN; - let mut buffer = [0u32; U32_BLOCK_LEN]; - - // simplified from ceil8(U32_BLOCK_LEN * num_bits) since U32_BLOCK_LEN = 32 - let compressed_chunk_size = 4 * num_bits; - - for _ in 0..chunks { - iterator - .by_ref() - .take(U32_BLOCK_LEN) - .zip(buffer.iter_mut()) - .for_each(|(item, buf)| *buf = item); - - let mut packed = [0u8; 4 * U32_BLOCK_LEN]; - bitpacked::encode_pack::(&buffer, num_bits, packed.as_mut()); - writer.write_all(&packed[..compressed_chunk_size])?; - } - - if remainder != 0 { - // Must be careful here to ensure we write a multiple of `num_bits` - // (the bit width) to align with the spec. Some readers also rely on - // this - see https://github.com/pola-rs/polars/pull/13883. - - // this is ceil8(remainder * num_bits), but we ensure the output is a - // multiple of num_bits by rewriting it as ceil8(remainder) * num_bits - let compressed_remainder_size = ceil8(remainder) * num_bits; - iterator - .by_ref() - .take(remainder) - .zip(buffer.iter_mut()) - .for_each(|(item, buf)| *buf = item); - - let mut packed = [0u8; 4 * U32_BLOCK_LEN]; - bitpacked::encode_pack(&buffer[..remainder], num_bits, packed.as_mut()); - writer.write_all(&packed[..compressed_remainder_size])?; - }; - Ok(()) - } + bitpacked_encode_u32(writer, iterator, num_bits as usize)?; - fn run_length_encode( - writer: &mut W, - run_length: usize, - value: u32, - bit_width: u32, - ) -> std::io::Result<()> { - // write the length + indicator - let mut header = run_length as u64; - header <<= 1; - let mut container = [0; 10]; - let used = uleb128::encode(header, &mut container); - writer.write_all(&container[..used])?; - - let num_bytes = ceil8(bit_width as usize); - let bytes = value.to_le_bytes(); - writer.write_all(&bytes[..num_bytes])?; - Ok(()) - } + Ok(()) } -impl Encoder for bool { - fn bitpacked_encode>( - writer: &mut W, - iterator: I, - _num_bits: usize, - ) -> std::io::Result<()> { - // the length of the iterator. - let length = iterator.size_hint().1.unwrap(); - - let mut header = ceil8(length) as u64; - header <<= 1; - header |= 1; // it is bitpacked => first bit is set - let mut container = [0; 10]; - let used = uleb128::encode(header, &mut container); - writer.write_all(&container[..used])?; - bitpacked_encode(writer, iterator)?; - Ok(()) - } +const U32_BLOCK_LEN: usize = 32; - fn run_length_encode( - writer: &mut W, - run_length: usize, - value: bool, - _bit_width: u32, - ) -> std::io::Result<()> { - // write the length + indicator - let mut header = run_length as u64; - header <<= 1; - let mut container = [0; 10]; - let used = uleb128::encode(header, &mut container); - writer.write_all(&container[..used])?; - writer.write_all(&(value as u8).to_le_bytes())?; - Ok(()) +fn bitpacked_encode_u32>( + writer: &mut W, + mut iterator: I, + num_bits: usize, +) -> std::io::Result<()> { + // the length of the iterator. + let length = iterator.size_hint().1.unwrap(); + + let chunks = length / U32_BLOCK_LEN; + let remainder = length - chunks * U32_BLOCK_LEN; + let mut buffer = [0u32; U32_BLOCK_LEN]; + + // simplified from ceil8(U32_BLOCK_LEN * num_bits) since U32_BLOCK_LEN = 32 + let compressed_chunk_size = 4 * num_bits; + + for _ in 0..chunks { + iterator + .by_ref() + .take(U32_BLOCK_LEN) + .zip(buffer.iter_mut()) + .for_each(|(item, buf)| *buf = item); + + let mut packed = [0u8; 4 * U32_BLOCK_LEN]; + bitpacked::encode_pack::(&buffer, num_bits, packed.as_mut()); + writer.write_all(&packed[..compressed_chunk_size])?; } + + if remainder != 0 { + // Must be careful here to ensure we write a multiple of `num_bits` + // (the bit width) to align with the spec. Some readers also rely on + // this - see https://github.com/pola-rs/polars/pull/13883. + + // this is ceil8(remainder * num_bits), but we ensure the output is a + // multiple of num_bits by rewriting it as ceil8(remainder) * num_bits + let compressed_remainder_size = ceil8(remainder) * num_bits; + iterator + .by_ref() + .take(remainder) + .zip(buffer.iter_mut()) + .for_each(|(item, buf)| *buf = item); + + let mut packed = [0u8; 4 * U32_BLOCK_LEN]; + bitpacked::encode_pack(&buffer, num_bits, packed.as_mut()); + writer.write_all(&packed[..compressed_remainder_size])?; + }; + Ok(()) } -#[allow(clippy::comparison_chain)] -pub fn encode, W: Write, I: Iterator>( +/// the bitpacked part of the encoder. +pub fn encode_bool>( writer: &mut W, iterator: I, - num_bits: u32, ) -> std::io::Result<()> { - let mut consecutive_repeats: usize = 0; - let mut previous_val = T::default(); - let mut buffered_bits = [previous_val; MAX_VALUES_PER_LITERAL_RUN]; - let mut buffer_idx = 0; - let mut literal_run_idx = 0; - for val in iterator { - if val == previous_val { - consecutive_repeats += 1; - // Run is long enough to RLE, no need to buffer values - if consecutive_repeats >= 8 { - // Run is long enough to RLE, no need to buffer values - if consecutive_repeats > 8 { - continue; - } else { - // Ensure literal run has multiple of 8 values - // Take from consecutive repeats if needed to pad up - let literal_padding = (8 - (literal_run_idx % 8)) % 8; - consecutive_repeats -= literal_padding; - literal_run_idx += literal_padding; - } - } - // Too short to RLE, continue to buffer values - } else if consecutive_repeats > 8 { - // Flush literal run, if any, before RLE run - if literal_run_idx > 0 { - T::bitpacked_encode( - writer, - buffered_bits.iter().copied().take(literal_run_idx), - num_bits as usize, - )?; - literal_run_idx = 0; - } - T::run_length_encode(writer, consecutive_repeats, previous_val, num_bits)?; - consecutive_repeats = 1; - buffer_idx = 0; - } else { - // Not enough consecutive repeats to RLE, extend literal run - literal_run_idx = buffer_idx; - consecutive_repeats = 1; - } - // If buffer is full, bit-pack as literal run and reset - if buffer_idx == MAX_VALUES_PER_LITERAL_RUN { - T::bitpacked_encode( - writer, - buffered_bits.iter().copied().take(literal_run_idx), - num_bits as usize, - )?; - // Consecutive repeats may be consolidated into literal run - consecutive_repeats -= buffer_idx - literal_run_idx; - buffer_idx = 0; - literal_run_idx = 0; - } - buffered_bits[buffer_idx] = val; - previous_val = val; - buffer_idx += 1; - } - // Not enough consecutive repeats to RLE, extend literal run - if consecutive_repeats <= 8 { - literal_run_idx = buffer_idx; - consecutive_repeats = 0; - } - if literal_run_idx > 0 { - T::bitpacked_encode( - writer, - buffered_bits.iter().copied().take(literal_run_idx), - num_bits as usize, - )?; - } - if consecutive_repeats > 8 { - T::run_length_encode(writer, consecutive_repeats, previous_val, num_bits)?; - } - Ok(()) + // the length of the iterator. + let length = iterator.size_hint().1.unwrap(); + + // write the length + indicator + let mut header = ceil8(length) as u64; + header <<= 1; + header |= 1; // it is bitpacked => first bit is set + let mut container = [0; 10]; + let used = uleb128::encode(header, &mut container); + + writer.write_all(&container[..used])?; + + // encode the iterator + bitpacked_encode(writer, iterator) } #[cfg(test)] @@ -226,7 +108,7 @@ mod tests { let mut vec = vec![]; - encode::(&mut vec, iter, 1)?; + encode_bool(&mut vec, iter)?; assert_eq!(vec, vec![(2 << 1 | 1), 0b10011101u8, 0b00011101]); @@ -237,10 +119,9 @@ mod tests { fn bool_from_iter() -> std::io::Result<()> { let mut vec = vec![]; - encode::( + encode_bool( &mut vec, vec![true, true, true, true, true, true, true, true].into_iter(), - 1, )?; assert_eq!(vec, vec![(1 << 1 | 1), 0b11111111]); @@ -251,7 +132,7 @@ mod tests { fn test_encode_u32() -> std::io::Result<()> { let mut vec = vec![]; - encode::(&mut vec, vec![0, 1, 2, 1, 2, 1, 1, 0, 3].into_iter(), 2)?; + encode_u32(&mut vec, vec![0, 1, 2, 1, 2, 1, 1, 0, 3].into_iter(), 2)?; assert_eq!( vec, @@ -272,7 +153,7 @@ mod tests { let values = (0..128).map(|x| x % 4); - encode::(&mut vec, values, 2)?; + encode_u32(&mut vec, values, 2)?; let length = 128; let expected = 0b11_10_01_00u8; @@ -289,7 +170,7 @@ mod tests { let values = vec![3, 3, 0, 3, 2, 3, 3, 3, 3, 1, 3, 3, 3, 0, 3].into_iter(); let mut vec = vec![]; - encode::(&mut vec, values, 2)?; + encode_u32(&mut vec, values, 2)?; let expected = vec![5, 207, 254, 247, 51]; assert_eq!(expected, vec); diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs index 89816f87fb54..3dc072552524 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs @@ -4,7 +4,7 @@ mod decoder; mod encoder; pub use bitmap::{encode_bool as bitpacked_encode, BitmapIter}; pub use decoder::Decoder; -pub use encoder::encode; +pub use encoder::{encode_bool, encode_u32}; use polars_utils::iter::FallibleIterator; use super::bitpacked; @@ -137,7 +137,7 @@ mod tests { let data = (0..1000).collect::>(); - encode::(&mut buffer, data.iter().cloned(), num_bits).unwrap(); + encode_u32(&mut buffer, data.iter().cloned(), num_bits).unwrap(); let decoder = HybridRleDecoder::try_new(&buffer, num_bits, data.len())?; diff --git a/crates/polars/tests/it/io/parquet/write/binary.rs b/crates/polars/tests/it/io/parquet/write/binary.rs index dd4e3a942c46..3112f115c3e7 100644 --- a/crates/polars/tests/it/io/parquet/write/binary.rs +++ b/crates/polars/tests/it/io/parquet/write/binary.rs @@ -1,4 +1,4 @@ -use polars_parquet::parquet::encoding::hybrid_rle::encode; +use polars_parquet::parquet::encoding::hybrid_rle::encode_bool; use polars_parquet::parquet::encoding::Encoding; use polars_parquet::parquet::error::Result; use polars_parquet::parquet::metadata::Descriptor; @@ -25,7 +25,7 @@ fn unzip_option(array: &[Option>]) -> Result<(Vec, Vec)> { false } }); - encode::(&mut validity, iter, 1)?; + encode_bool(&mut validity, iter)?; // write the length, now that it is known let mut validity = validity.into_inner(); diff --git a/crates/polars/tests/it/io/parquet/write/primitive.rs b/crates/polars/tests/it/io/parquet/write/primitive.rs index e5da32252e99..3b5ae150896a 100644 --- a/crates/polars/tests/it/io/parquet/write/primitive.rs +++ b/crates/polars/tests/it/io/parquet/write/primitive.rs @@ -1,4 +1,4 @@ -use polars_parquet::parquet::encoding::hybrid_rle::encode; +use polars_parquet::parquet::encoding::hybrid_rle::encode_bool; use polars_parquet::parquet::encoding::Encoding; use polars_parquet::parquet::error::Result; use polars_parquet::parquet::metadata::Descriptor; @@ -24,7 +24,7 @@ fn unzip_option(array: &[Option]) -> Result<(Vec, Vec) false } }); - encode::(&mut validity, iter, 1)?; + encode_bool(&mut validity, iter)?; // write the length, now that it is known let mut validity = validity.into_inner(); diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 846b4252e548..12ac1a835b40 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -892,46 +892,3 @@ def test_no_glob_windows(tmp_path: Path) -> None: df.write_parquet(str(p2)) assert_frame_equal(pl.scan_parquet(str(p1), glob=False).collect(), df) - - -@pytest.mark.slow() -def test_hybrid_rle() -> None: - df = pl.DataFrame( - { - # Test primitive types - "i64": pl.repeat(int(2**63 - 1), n=10000, dtype=pl.Int64, eager=True), - "u64": pl.repeat(int(2**64 - 1), n=10000, dtype=pl.UInt64, eager=True), - "i8": pl.repeat(-int(2**7 - 1), n=10000, dtype=pl.Int8, eager=True), - "u8": pl.repeat(int(2**8 - 1), n=10000, dtype=pl.UInt8, eager=True), - "string": pl.repeat("a", n=10000, dtype=pl.String, eager=True), - "categorical": pl.Series((["a"] * 9 + ["b"]) * 1000, dtype=pl.Categorical), - # Test filling up bit-packing buffer - "large_bit_pack": ([0] * 5 + [1] * 5) * 1000, - # Test mix of bit-packed and RLE runs - "bit_pack_and_rle": ( - [0] + [1] * 19 + [2] * 8 + [3] * 12 + [4] * 5 + [5] * 5 - ) - * 200, - # Test some null values - "nulls_included": ( - [None] + [1] * 19 + [None] * 8 + [3] * 12 + [4] * 5 + [None] * 5 - ) - * 200, - # Test filling up bit-packing buffer for encode_bool, - # which is only used to encode validities - # Also checks that runs are handled correctly if buffer - # is flushed (at MAX_VALUES_PER_LITERAL_RUN values) - "large_bit_pack_validity": [0, None] * 4092 - + [0] * 9 - + [1] * 9 - + [2] * 10 - + [0] * 1788, - } - ) - f = io.BytesIO() - df.write_parquet(f) - f.seek(0) - for column in pq.ParquetFile(f).metadata.to_dict()["row_groups"][0]["columns"]: - assert "RLE_DICTIONARY" in column["encodings"] - f.seek(0) - assert_frame_equal(pl.read_parquet(f), df) From a3ebdfca9b97ab3c045ef911eb9b5348acad1c65 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Wed, 8 May 2024 10:36:36 +0200 Subject: [PATCH 09/11] python polars 0.20.25 (#16114) --- Cargo.lock | 2 +- py-polars/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 528322bc0fff..661c396729e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3249,7 +3249,7 @@ dependencies = [ [[package]] name = "py-polars" -version = "0.20.24" +version = "0.20.25" dependencies = [ "ahash", "arboard", diff --git a/py-polars/Cargo.toml b/py-polars/Cargo.toml index d296c96c7c11..cb530b2d1445 100644 --- a/py-polars/Cargo.toml +++ b/py-polars/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "py-polars" -version = "0.20.24" +version = "0.20.25" edition = "2021" [lib] From ddc30ab8f0babc453215b24b3517462f91315da3 Mon Sep 17 00:00:00 2001 From: Alexander Beedie Date: Wed, 8 May 2024 14:51:11 +0400 Subject: [PATCH 10/11] feat(python): Add top-level `nth(n)` method, to go with existing `first` and `last` (#16112) --- crates/polars-plan/src/dsl/mod.rs | 9 +- .../reference/expressions/functions.rst | 1 + py-polars/polars/__init__.py | 2 + py-polars/polars/functions/__init__.py | 2 + py-polars/polars/functions/lazy.py | 96 ++++++++++++++++--- py-polars/src/functions/lazy.rs | 5 + py-polars/src/lib.rs | 1 + py-polars/tests/unit/dataframe/test_df.py | 11 ++- .../tests/unit/functions/test_functions.py | 3 + 9 files changed, 112 insertions(+), 18 deletions(-) diff --git a/crates/polars-plan/src/dsl/mod.rs b/crates/polars-plan/src/dsl/mod.rs index ac3439d20e3b..015e999851df 100644 --- a/crates/polars-plan/src/dsl/mod.rs +++ b/crates/polars-plan/src/dsl/mod.rs @@ -1885,12 +1885,17 @@ pub fn len() -> Expr { Expr::Len } -/// First column in DataFrame. +/// First column in a DataFrame. pub fn first() -> Expr { Expr::Nth(0) } -/// Last column in DataFrame. +/// Last column in a DataFrame. pub fn last() -> Expr { Expr::Nth(-1) } + +/// Nth column in a DataFrame. +pub fn nth(n: i64) -> Expr { + Expr::Nth(n) +} diff --git a/py-polars/docs/source/reference/expressions/functions.rst b/py-polars/docs/source/reference/expressions/functions.rst index 546c44360332..7fdde4dddac1 100644 --- a/py-polars/docs/source/reference/expressions/functions.rst +++ b/py-polars/docs/source/reference/expressions/functions.rst @@ -71,6 +71,7 @@ These functions are available from the Polars module root and can be used as exp min min_horizontal n_unique + nth ones quantile reduce diff --git a/py-polars/polars/__init__.py b/py-polars/polars/__init__.py index a2eaf7b826eb..29c5a66f061d 100644 --- a/py-polars/polars/__init__.py +++ b/py-polars/polars/__init__.py @@ -158,6 +158,7 @@ min, min_horizontal, n_unique, + nth, ones, quantile, reduce, @@ -401,6 +402,7 @@ "mean", "median", "n_unique", + "nth", "quantile", "reduce", "rolling_corr", diff --git a/py-polars/polars/functions/__init__.py b/py-polars/polars/functions/__init__.py index 048587300c21..b658e5b4b3e9 100644 --- a/py-polars/polars/functions/__init__.py +++ b/py-polars/polars/functions/__init__.py @@ -61,6 +61,7 @@ mean, median, n_unique, + nth, quantile, reduce, rolling_corr, @@ -162,6 +163,7 @@ "mean_horizontal", "median", "n_unique", + "nth", "quantile", "reduce", "rolling_corr", diff --git a/py-polars/polars/functions/lazy.py b/py-polars/polars/functions/lazy.py index 1a9059e00324..99a881cf6373 100644 --- a/py-polars/polars/functions/lazy.py +++ b/py-polars/polars/functions/lazy.py @@ -22,7 +22,6 @@ with contextlib.suppress(ImportError): # Module not available when building docs import polars.polars as plr - if TYPE_CHECKING: from typing import Awaitable, Collection, Literal @@ -518,18 +517,16 @@ def approx_n_unique(*columns: str) -> Expr: @deprecate_parameter_as_positional("column", version="0.20.4") def first(*columns: str) -> Expr: """ - Get the first value. - - This function has different behavior depending on the input type: + Get the first column or value. - - `None` -> Takes first column of a context (equivalent to `cs.first()`). - - `str` or `[str,]` -> Syntactic sugar for `pl.col(columns).first()`. + This function has different behavior depending on the presence of `columns` + values. If none given (the default), returns an expression that takes the first + column of the context; otherwise, takes the first value of the given column(s). Parameters ---------- *columns - One or more column names. If not provided (default), returns an expression - to take the first column of the context instead. + One or more column names. Examples -------- @@ -540,6 +537,9 @@ def first(*columns: str) -> Expr: ... "c": ["foo", "bar", "baz"], ... } ... ) + + Return the first column: + >>> df.select(pl.first()) shape: (3, 1) ┌─────┐ @@ -551,6 +551,9 @@ def first(*columns: str) -> Expr: │ 8 │ │ 3 │ └─────┘ + + Return the first value for the given column(s): + >>> df.select(pl.first("b")) shape: (1, 1) ┌─────┐ @@ -580,18 +583,16 @@ def first(*columns: str) -> Expr: @deprecate_parameter_as_positional("column", version="0.20.4") def last(*columns: str) -> Expr: """ - Get the last value. + Get the last column or value. - This function has different behavior depending on the input type: - - - `None` -> Takes last column of a context (equivalent to `cs.last()`). - - `str` or `[str,]` -> Syntactic sugar for `pl.col(columns).last()`. + This function has different behavior depending on the presence of `columns` + values. If none given (the default), returns an expression that takes the last + column of the context; otherwise, takes the last value of the given column(s). Parameters ---------- *columns - One or more column names. If set to `None` (default), returns an expression - to take the last column of the context instead. + One or more column names. Examples -------- @@ -602,6 +603,9 @@ def last(*columns: str) -> Expr: ... "c": ["foo", "bar", "baz"], ... } ... ) + + Return the last column: + >>> df.select(pl.last()) shape: (3, 1) ┌─────┐ @@ -613,6 +617,9 @@ def last(*columns: str) -> Expr: │ bar │ │ baz │ └─────┘ + + Return the last value for the given column(s): + >>> df.select(pl.last("a")) shape: (1, 1) ┌─────┐ @@ -639,6 +646,65 @@ def last(*columns: str) -> Expr: return F.col(*columns).last() +def nth(n: int, *columns: str) -> Expr: + """ + Get the nth column or value. + + This function has different behavior depending on the presence of `columns` + values. If none given (the default), returns an expression that takes the nth + column of the context; otherwise, takes the nth value of the given column(s). + + Parameters + ---------- + n + Index of the column (or value) to get. + *columns + One or more column names. If omitted (the default), returns an + expression that takes the nth column of the context. Otherwise, + returns takes the nth value of the given column(s). + + Examples + -------- + >>> df = pl.DataFrame( + ... { + ... "a": [1, 8, 3], + ... "b": [4, 5, 2], + ... "c": ["foo", "bar", "baz"], + ... } + ... ) + + Return the "nth" column: + + >>> df.select(pl.nth(1)) + shape: (3, 1) + ┌─────┐ + │ b │ + │ --- │ + │ i64 │ + ╞═════╡ + │ 4 │ + │ 5 │ + │ 2 │ + └─────┘ + + Return the "nth" value for the given columns: + + >>> df.select(pl.nth(-2, "b", "c")) + shape: (1, 2) + ┌─────┬─────┐ + │ b ┆ c │ + │ --- ┆ --- │ + │ i64 ┆ str │ + ╞═════╪═════╡ + │ 5 ┆ bar │ + └─────┴─────┘ + """ + if not columns: + return wrap_expr(plr.nth(n)) + + return F.col(*columns).get(n) + + def head(column: str, n: int = 10) -> Expr: """ Get the first `n` rows. diff --git a/py-polars/src/functions/lazy.rs b/py-polars/src/functions/lazy.rs index 5266b9180aa3..a3755679ec09 100644 --- a/py-polars/src/functions/lazy.rs +++ b/py-polars/src/functions/lazy.rs @@ -386,6 +386,11 @@ pub fn last() -> PyExpr { dsl::last().into() } +#[pyfunction] +pub fn nth(n: i64) -> PyExpr { + dsl::nth(n).into() +} + #[pyfunction] pub fn lit(value: &PyAny, allow_object: bool) -> PyResult { if value.is_instance_of::() { diff --git a/py-polars/src/lib.rs b/py-polars/src/lib.rs index ba5fccb77810..6abd7a3056e2 100644 --- a/py-polars/src/lib.rs +++ b/py-polars/src/lib.rs @@ -264,6 +264,7 @@ fn polars(py: Python, m: &Bound) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(functions::last)).unwrap(); m.add_wrapped(wrap_pyfunction!(functions::lit)).unwrap(); m.add_wrapped(wrap_pyfunction!(functions::map_mul)).unwrap(); + m.add_wrapped(wrap_pyfunction!(functions::nth)).unwrap(); m.add_wrapped(wrap_pyfunction!(functions::pearson_corr)) .unwrap(); m.add_wrapped(wrap_pyfunction!(functions::rolling_corr)) diff --git a/py-polars/tests/unit/dataframe/test_df.py b/py-polars/tests/unit/dataframe/test_df.py index e2e7698c9889..0fc418c20141 100644 --- a/py-polars/tests/unit/dataframe/test_df.py +++ b/py-polars/tests/unit/dataframe/test_df.py @@ -2178,7 +2178,7 @@ def test_product() -> None: assert_frame_equal(out, expected, check_dtype=False) -def test_first_last_expression(fruits_cars: pl.DataFrame) -> None: +def test_first_last_nth_expressions(fruits_cars: pl.DataFrame) -> None: df = fruits_cars out = df.select(pl.first()) assert out.columns == ["A"] @@ -2186,6 +2186,15 @@ def test_first_last_expression(fruits_cars: pl.DataFrame) -> None: out = df.select(pl.last()) assert out.columns == ["cars"] + out = df.select(pl.nth(0)) + assert out.columns == ["A"] + + out = df.select(pl.nth(1)) + assert out.columns == ["fruits"] + + out = df.select(pl.nth(-2)) + assert out.columns == ["B"] + def test_is_between(fruits_cars: pl.DataFrame) -> None: result = fruits_cars.select(pl.col("A").is_between(2, 4)).to_series() diff --git a/py-polars/tests/unit/functions/test_functions.py b/py-polars/tests/unit/functions/test_functions.py index 524df06eb9cc..e8b75888fe56 100644 --- a/py-polars/tests/unit/functions/test_functions.py +++ b/py-polars/tests/unit/functions/test_functions.py @@ -447,6 +447,7 @@ def test_lazy_functions() -> None: pl.first("a").name.suffix("_first"), pl.first("b", "c").name.suffix("_first"), pl.last("c", "b", "a").name.suffix("_last"), + pl.nth(1, "c", "a").name.suffix("_nth1"), ) expected: dict[str, list[Any]] = { "b_var": [1.0], @@ -469,6 +470,8 @@ def test_lazy_functions() -> None: "c_last": [4.0], "b_last": [3], "a_last": ["foo"], + "c_nth1": [2.0], + "a_nth1": ["bar"], } assert_frame_equal( out, From 12b40b9394b9823ae7daeca222b056c09b78aa2f Mon Sep 17 00:00:00 2001 From: Alexander Beedie Date: Wed, 8 May 2024 15:12:30 +0400 Subject: [PATCH 11/11] fix: Ensure hex and bitstring literals work inside SQL `IN` clauses (#16101) --- crates/polars-sql/src/sql_expr.rs | 57 ++++++++++++++--------- py-polars/tests/unit/sql/test_literals.py | 34 ++++++++++++-- 2 files changed, 67 insertions(+), 24 deletions(-) diff --git a/crates/polars-sql/src/sql_expr.rs b/crates/polars-sql/src/sql_expr.rs index a2655caf7342..b20fde159b4f 100644 --- a/crates/polars-sql/src/sql_expr.rs +++ b/crates/polars-sql/src/sql_expr.rs @@ -581,24 +581,10 @@ impl SQLExprVisitor<'_> { .map_err(|_| polars_err!(ComputeError: "cannot parse literal: {:?}", s))? }, SQLValue::SingleQuotedByteStringLiteral(b) => { - // note: for PostgreSQL this syntax represents a BIT string literal (eg: b'10101') not a BYTE - // string literal (see https://www.postgresql.org/docs/current/datatype-bit.html), but sqlparser + // note: for PostgreSQL this represents a BIT string literal (eg: b'10101') not a BYTE string + // literal (see https://www.postgresql.org/docs/current/datatype-bit.html), but sqlparser // patterned the token name after BigQuery (where b'str' really IS a byte string) - if !b.chars().all(|c| c == '0' || c == '1') { - polars_bail!(ComputeError: "bit string literal should contain only 0s and 1s; found '{}'", b) - } - let n_bits = b.len(); - let s = b.as_str(); - lit(match n_bits { - 0 => b"".to_vec(), - 1..=8 => u8::from_str_radix(s, 2).unwrap().to_be_bytes().to_vec(), - 9..=16 => u16::from_str_radix(s, 2).unwrap().to_be_bytes().to_vec(), - 17..=32 => u32::from_str_radix(s, 2).unwrap().to_be_bytes().to_vec(), - 33..=64 => u64::from_str_radix(s, 2).unwrap().to_be_bytes().to_vec(), - _ => { - polars_bail!(ComputeError: "cannot parse bit string literal with len > 64 (len={:?})", n_bits) - }, - }) + bitstring_to_bytes_literal(b)? }, SQLValue::SingleQuotedString(s) => lit(s.clone()), other => polars_bail!(ComputeError: "SQL value {:?} is not yet supported", other), @@ -635,10 +621,24 @@ impl SQLExprVisitor<'_> { } .map_err(|_| polars_err!(ComputeError: "cannot parse literal: {s:?}"))? }, - SQLValue::SingleQuotedString(s) - | SQLValue::NationalStringLiteral(s) - | SQLValue::HexStringLiteral(s) - | SQLValue::DoubleQuotedString(s) => AnyValue::StringOwned(s.into()), + #[cfg(feature = "binary_encoding")] + SQLValue::HexStringLiteral(x) => { + if x.len() % 2 != 0 { + polars_bail!(ComputeError: "hex string literal must have an even number of digits; found '{}'", x) + }; + AnyValue::BinaryOwned(hex::decode(x.clone()).unwrap()) + }, + SQLValue::SingleQuotedByteStringLiteral(b) => { + // note: for PostgreSQL this represents a BIT literal (eg: b'10101') not BYTE + let bytes_literal = bitstring_to_bytes_literal(b)?; + match bytes_literal { + Expr::Literal(LiteralValue::Binary(v)) => AnyValue::BinaryOwned(v.to_vec()), + _ => polars_bail!(ComputeError: "failed to parse bitstring literal: {:?}", b), + } + }, + SQLValue::SingleQuotedString(s) | SQLValue::DoubleQuotedString(s) => { + AnyValue::StringOwned(s.into()) + }, other => polars_bail!(ComputeError: "SQL value {:?} is not yet supported", other), }) } @@ -1107,3 +1107,18 @@ pub(crate) fn parse_date_part(expr: Expr, part: &str) -> PolarsResult { }, ) } + +fn bitstring_to_bytes_literal(b: &String) -> PolarsResult { + let n_bits = b.len(); + if !b.chars().all(|c| c == '0' || c == '1') || n_bits > 64 { + polars_bail!(ComputeError: "bit string literal should contain only 0s and 1s and have length <= 64; found '{}' with length {}", b, n_bits) + } + let s = b.as_str(); + Ok(lit(match n_bits { + 0 => b"".to_vec(), + 1..=8 => u8::from_str_radix(s, 2).unwrap().to_be_bytes().to_vec(), + 9..=16 => u16::from_str_radix(s, 2).unwrap().to_be_bytes().to_vec(), + 17..=32 => u32::from_str_radix(s, 2).unwrap().to_be_bytes().to_vec(), + _ => u64::from_str_radix(s, 2).unwrap().to_be_bytes().to_vec(), + })) +} diff --git a/py-polars/tests/unit/sql/test_literals.py b/py-polars/tests/unit/sql/test_literals.py index 0f24963e6c64..f46b347fa547 100644 --- a/py-polars/tests/unit/sql/test_literals.py +++ b/py-polars/tests/unit/sql/test_literals.py @@ -6,7 +6,7 @@ from polars.exceptions import ComputeError -def test_bin_hex_literals() -> None: +def test_bit_hex_literals() -> None: with pl.SQLContext(df=None, eager_execution=True) as ctx: out = ctx.execute( """ @@ -37,7 +37,7 @@ def test_bin_hex_literals() -> None: } -def test_bin_hex_filter() -> None: +def test_bit_hex_filter() -> None: df = pl.DataFrame( {"bin": [b"\x01", b"\x02", b"\x03", b"\x04"], "val": [9, 8, 7, 6]} ) @@ -47,7 +47,7 @@ def test_bin_hex_filter() -> None: assert out.to_series().to_list() == [7, 6] -def test_bin_hex_errors() -> None: +def test_bit_hex_errors() -> None: with pl.SQLContext(test=None) as ctx: with pytest.raises( ComputeError, @@ -60,3 +60,31 @@ def test_bin_hex_errors() -> None: match="hex string literal must have an even number of digits", ): ctx.execute("SELECT x'00F' FROM test", eager=True) + + with pytest.raises( + ComputeError, + match="hex string literal must have an even number of digits", + ): + pl.sql_expr("colx IN (x'FF',x'123')") + + with pytest.raises( + ComputeError, + match=r'NationalStringLiteral\("hmmm"\) is not yet supported', + ): + pl.sql_expr("N'hmmm'") + + +def test_bit_hex_membership() -> None: + df = pl.DataFrame( + { + "x": [b"\x05", b"\xff", b"\xcc", b"\x0b"], + "y": [1, 2, 3, 4], + } + ) + # this checks the internal `visit_any_value` codepath + for values in ( + "b'0101', b'1011'", + "x'05', x'0b'", + ): + dff = df.filter(pl.sql_expr(f"x IN ({values})")) + assert dff["y"].to_list() == [1, 4]