diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index f627c9cd6bb2..0a8f130dfe51 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -2360,12 +2360,25 @@ impl DataFrame { /// This responsibility is left to the caller as we don't want to take mutable references here, /// but we also don't want to rechunk here, as this operation is costly and would benefit the caller /// as well. - pub fn iter_chunks(&self, pl_flavor: bool) -> RecordBatchIter { + pub fn iter_chunks(&self, pl_flavor: bool, parallel: bool) -> RecordBatchIter { + // If any of the columns is binview and we don't convert `pl_flavor` we allow parallelism + // as we must allocate arrow strings/binaries. + let parallel = if parallel && !pl_flavor { + self.columns.len() > 1 + && self + .columns + .iter() + .any(|s| matches!(s.dtype(), DataType::String | DataType::Binary)) + } else { + false + }; + RecordBatchIter { columns: &self.columns, idx: 0, n_chunks: self.n_chunks(), pl_flavor, + parallel, } } @@ -2984,6 +2997,7 @@ pub struct RecordBatchIter<'a> { idx: usize, n_chunks: usize, pl_flavor: bool, + parallel: bool, } impl<'a> Iterator for RecordBatchIter<'a> { @@ -2993,12 +3007,19 @@ impl<'a> Iterator for RecordBatchIter<'a> { if self.idx >= self.n_chunks { None } else { - // create a batch of the columns with the same chunk no. - let batch_cols = self - .columns - .iter() - .map(|s| s.to_arrow(self.idx, self.pl_flavor)) - .collect(); + // Create a batch of the columns with the same chunk no. + let batch_cols = if self.parallel { + let iter = self + .columns + .par_iter() + .map(|s| s.to_arrow(self.idx, self.pl_flavor)); + POOL.install(|| iter.collect()) + } else { + self.columns + .iter() + .map(|s| s.to_arrow(self.idx, self.pl_flavor)) + .collect() + }; self.idx += 1; Some(RecordBatch::new(batch_cols)) @@ -3074,7 +3095,7 @@ mod test { "foo" => &[1, 2, 3, 4, 5] ) .unwrap(); - let mut iter = df.iter_chunks(true); + let mut iter = df.iter_chunks(true, false); assert_eq!(5, iter.next().unwrap().len()); assert!(iter.next().is_none()); } diff --git a/crates/polars-io/src/avro/write.rs b/crates/polars-io/src/avro/write.rs index 82f5ab7d0f30..b12cd358da16 100644 --- a/crates/polars-io/src/avro/write.rs +++ b/crates/polars-io/src/avro/write.rs @@ -69,7 +69,7 @@ where let mut data = vec![]; let mut compressed_block = avro_schema::file::CompressedBlock::default(); - for chunk in df.iter_chunks(false) { + for chunk in df.iter_chunks(false, true) { let mut serializers = chunk .iter() .zip(record.fields.iter()) diff --git a/crates/polars-io/src/ipc/ipc_stream.rs b/crates/polars-io/src/ipc/ipc_stream.rs index 81ee77dd8456..ed0872dc1ea0 100644 --- a/crates/polars-io/src/ipc/ipc_stream.rs +++ b/crates/polars-io/src/ipc/ipc_stream.rs @@ -249,7 +249,7 @@ where ipc_stream_writer.start(&df.schema().to_arrow(self.pl_flavor), None)?; let df = chunk_df_for_writing(df, 512 * 512)?; - let iter = df.iter_chunks(self.pl_flavor); + let iter = df.iter_chunks(self.pl_flavor, true); for batch in iter { ipc_stream_writer.write(&batch, None)? diff --git a/crates/polars-io/src/ipc/write.rs b/crates/polars-io/src/ipc/write.rs index 51c12ad4a0c4..cdc60fc250f6 100644 --- a/crates/polars-io/src/ipc/write.rs +++ b/crates/polars-io/src/ipc/write.rs @@ -99,7 +99,7 @@ where }, )?; df.align_chunks(); - let iter = df.iter_chunks(self.pl_flavor); + let iter = df.iter_chunks(self.pl_flavor, true); for batch in iter { ipc_writer.write(&batch, None)? @@ -120,7 +120,7 @@ impl BatchedWriter { /// # Panics /// The caller must ensure the chunks in the given [`DataFrame`] are aligned. pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { - let iter = df.iter_chunks(self.pl_flavor); + let iter = df.iter_chunks(self.pl_flavor, true); for batch in iter { self.writer.write(&batch, None)? } diff --git a/crates/polars-io/src/ipc/write_async.rs b/crates/polars-io/src/ipc/write_async.rs index fcaa0738f7c3..7a5b3240cbb5 100644 --- a/crates/polars-io/src/ipc/write_async.rs +++ b/crates/polars-io/src/ipc/write_async.rs @@ -44,7 +44,7 @@ where /// # Panics /// The caller must ensure the chunks in the given [`DataFrame`] are aligned. pub async fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { - let iter = df.iter_chunks(false); + let iter = df.iter_chunks(false, true); for batch in iter { self.writer.feed(batch.into()).await?; } diff --git a/crates/polars-io/src/json/mod.rs b/crates/polars-io/src/json/mod.rs index 9b0d3b8b6206..c51e238da2f9 100644 --- a/crates/polars-io/src/json/mod.rs +++ b/crates/polars-io/src/json/mod.rs @@ -150,7 +150,7 @@ where }) .collect::>>()?; let batches = df - .iter_chunks(true) + .iter_chunks(true, false) .map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef)); match self.json_format { @@ -194,7 +194,7 @@ where Ok(s.field().to_arrow(true)) }) .collect::>>()?; - let chunks = df.iter_chunks(true); + let chunks = df.iter_chunks(true, false); let batches = chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef)); let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]); diff --git a/crates/polars-io/src/parquet/write/batched_writer.rs b/crates/polars-io/src/parquet/write/batched_writer.rs index f5b42b7ef690..5f363e0eb1a3 100644 --- a/crates/polars-io/src/parquet/write/batched_writer.rs +++ b/crates/polars-io/src/parquet/write/batched_writer.rs @@ -27,7 +27,7 @@ impl BatchedWriter { &'a self, df: &'a DataFrame, ) -> impl Iterator>> + 'a { - let rb_iter = df.iter_chunks(true); + let rb_iter = df.iter_chunks(true, false); rb_iter.filter_map(move |batch| match batch.len() { 0 => None, _ => { @@ -95,7 +95,7 @@ fn prepare_rg_iter<'a>( options: WriteOptions, parallel: bool, ) -> impl Iterator>> + 'a { - let rb_iter = df.iter_chunks(true); + let rb_iter = df.iter_chunks(true, false); rb_iter.filter_map(move |batch| match batch.len() { 0 => None, _ => { diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index 8912ae73be84..968278333644 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -1359,7 +1359,7 @@ def item(self, row: int | None = None, column: int | str | None = None) -> Any: ) return s.get_index_signed(row) - def to_arrow(self) -> pa.Table: + def to_arrow(self, *, future=False) -> pa.Table: """ Collect the underlying arrow arrays in an Arrow Table. @@ -1368,6 +1368,16 @@ def to_arrow(self) -> pa.Table: Data types that do copy: - CategoricalType + Parameters + ---------- + future + Setting this to `True` will write Polars' internal data structures that + might not be available by other Arrow implementations. + + .. warning:: + This functionality is considered **unstable**. It may be changed + at any point without it being considered a breaking change. + Examples -------- >>> df = pl.DataFrame( @@ -1384,7 +1394,12 @@ def to_arrow(self) -> pa.Table: if not self.width: # 0x0 dataframe, cannot infer schema from batches return pa.table({}) - record_batches = self._df.to_arrow() + if future: + issue_unstable_warning( + "The `future` parameter of `DataFrame.to_arrow` is considered unstable." + ) + + record_batches = self._df.to_arrow(future) return pa.Table.from_batches(record_batches) @overload diff --git a/py-polars/src/dataframe/export.rs b/py-polars/src/dataframe/export.rs index 1c6d9bfc7698..2fa36b3d5a33 100644 --- a/py-polars/src/dataframe/export.rs +++ b/py-polars/src/dataframe/export.rs @@ -63,7 +63,7 @@ impl PyDataFrame { } #[allow(clippy::wrong_self_convention)] - pub fn to_arrow(&mut self) -> PyResult> { + pub fn to_arrow(&mut self, future: bool) -> PyResult> { self.df.align_chunks(); Python::with_gil(|py| { let pyarrow = py.import_bound("pyarrow")?; @@ -71,7 +71,7 @@ impl PyDataFrame { let rbs = self .df - .iter_chunks(false) + .iter_chunks(future, true) .map(|rb| interop::arrow::to_py::to_py_rb(&rb, &names, py, &pyarrow)) .collect::>()?; Ok(rbs)