Skip to content

Commit

Permalink
perf: Parallelize arrow conversion if binview -> large_bin
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 20, 2024
1 parent 760067c commit 60d5b35
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 21 deletions.
37 changes: 29 additions & 8 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2360,12 +2360,25 @@ impl DataFrame {
/// This responsibility is left to the caller as we don't want to take mutable references here,
/// but we also don't want to rechunk here, as this operation is costly and would benefit the caller
/// as well.
pub fn iter_chunks(&self, pl_flavor: bool) -> RecordBatchIter {
pub fn iter_chunks(&self, pl_flavor: bool, parallel: bool) -> RecordBatchIter {
// If any of the columns is binview and we don't convert `pl_flavor` we allow parallelism
// as we must allocate arrow strings/binaries.
let parallel = if parallel && !pl_flavor {
self.columns.len() > 1
&& self
.columns
.iter()
.any(|s| matches!(s.dtype(), DataType::String | DataType::Binary))
} else {
false
};

RecordBatchIter {
columns: &self.columns,
idx: 0,
n_chunks: self.n_chunks(),
pl_flavor,
parallel,
}
}

Expand Down Expand Up @@ -2984,6 +2997,7 @@ pub struct RecordBatchIter<'a> {
idx: usize,
n_chunks: usize,
pl_flavor: bool,
parallel: bool,
}

impl<'a> Iterator for RecordBatchIter<'a> {
Expand All @@ -2993,12 +3007,19 @@ impl<'a> Iterator for RecordBatchIter<'a> {
if self.idx >= self.n_chunks {
None
} else {
// create a batch of the columns with the same chunk no.
let batch_cols = self
.columns
.iter()
.map(|s| s.to_arrow(self.idx, self.pl_flavor))
.collect();
// Create a batch of the columns with the same chunk no.
let batch_cols = if self.parallel {
let iter = self
.columns
.par_iter()
.map(|s| s.to_arrow(self.idx, self.pl_flavor));
POOL.install(|| iter.collect())
} else {
self.columns
.iter()
.map(|s| s.to_arrow(self.idx, self.pl_flavor))
.collect()
};
self.idx += 1;

Some(RecordBatch::new(batch_cols))
Expand Down Expand Up @@ -3074,7 +3095,7 @@ mod test {
"foo" => &[1, 2, 3, 4, 5]
)
.unwrap();
let mut iter = df.iter_chunks(true);
let mut iter = df.iter_chunks(true, false);
assert_eq!(5, iter.next().unwrap().len());
assert!(iter.next().is_none());
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/avro/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ where

let mut data = vec![];
let mut compressed_block = avro_schema::file::CompressedBlock::default();
for chunk in df.iter_chunks(false) {
for chunk in df.iter_chunks(false, true) {
let mut serializers = chunk
.iter()
.zip(record.fields.iter())
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/ipc/ipc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ where

ipc_stream_writer.start(&df.schema().to_arrow(self.pl_flavor), None)?;
let df = chunk_df_for_writing(df, 512 * 512)?;
let iter = df.iter_chunks(self.pl_flavor);
let iter = df.iter_chunks(self.pl_flavor, true);

for batch in iter {
ipc_stream_writer.write(&batch, None)?
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/ipc/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
},
)?;
df.align_chunks();
let iter = df.iter_chunks(self.pl_flavor);
let iter = df.iter_chunks(self.pl_flavor, true);

for batch in iter {
ipc_writer.write(&batch, None)?
Expand All @@ -120,7 +120,7 @@ impl<W: Write> BatchedWriter<W> {
/// # Panics
/// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
let iter = df.iter_chunks(self.pl_flavor);
let iter = df.iter_chunks(self.pl_flavor, true);
for batch in iter {
self.writer.write(&batch, None)?
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/ipc/write_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ where
/// # Panics
/// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
pub async fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
let iter = df.iter_chunks(false);
let iter = df.iter_chunks(false, true);
for batch in iter {
self.writer.feed(batch.into()).await?;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ where
})
.collect::<PolarsResult<Vec<_>>>()?;
let batches = df
.iter_chunks(true)
.iter_chunks(true, false)
.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));

match self.json_format {
Expand Down Expand Up @@ -194,7 +194,7 @@ where
Ok(s.field().to_arrow(true))
})
.collect::<PolarsResult<Vec<_>>>()?;
let chunks = df.iter_chunks(true);
let chunks = df.iter_chunks(true, false);
let batches =
chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/write/batched_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl<W: Write> BatchedWriter<W> {
&'a self,
df: &'a DataFrame,
) -> impl Iterator<Item = PolarsResult<RowGroupIterColumns<'static, PolarsError>>> + 'a {
let rb_iter = df.iter_chunks(true);
let rb_iter = df.iter_chunks(true, false);
rb_iter.filter_map(move |batch| match batch.len() {
0 => None,
_ => {
Expand Down Expand Up @@ -95,7 +95,7 @@ fn prepare_rg_iter<'a>(
options: WriteOptions,
parallel: bool,
) -> impl Iterator<Item = PolarsResult<RowGroupIterColumns<'static, PolarsError>>> + 'a {
let rb_iter = df.iter_chunks(true);
let rb_iter = df.iter_chunks(true, false);
rb_iter.filter_map(move |batch| match batch.len() {
0 => None,
_ => {
Expand Down
19 changes: 17 additions & 2 deletions py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,7 @@ def item(self, row: int | None = None, column: int | str | None = None) -> Any:
)
return s.get_index_signed(row)

def to_arrow(self) -> pa.Table:
def to_arrow(self, *, future=False) -> pa.Table:
"""
Collect the underlying arrow arrays in an Arrow Table.
Expand All @@ -1368,6 +1368,16 @@ def to_arrow(self) -> pa.Table:
Data types that do copy:
- CategoricalType
Parameters
----------
future
Setting this to `True` will write Polars' internal data structures that
might not be available by other Arrow implementations.
.. warning::
This functionality is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
Examples
--------
>>> df = pl.DataFrame(
Expand All @@ -1384,7 +1394,12 @@ def to_arrow(self) -> pa.Table:
if not self.width: # 0x0 dataframe, cannot infer schema from batches
return pa.table({})

record_batches = self._df.to_arrow()
if future:
issue_unstable_warning(
"The `future` parameter of `DataFrame.to_arrow` is considered unstable."
)

record_batches = self._df.to_arrow(future)
return pa.Table.from_batches(record_batches)

@overload
Expand Down
4 changes: 2 additions & 2 deletions py-polars/src/dataframe/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ impl PyDataFrame {
}

#[allow(clippy::wrong_self_convention)]
pub fn to_arrow(&mut self) -> PyResult<Vec<PyObject>> {
pub fn to_arrow(&mut self, future: bool) -> PyResult<Vec<PyObject>> {
self.df.align_chunks();
Python::with_gil(|py| {
let pyarrow = py.import_bound("pyarrow")?;
let names = self.df.get_column_names();

let rbs = self
.df
.iter_chunks(false)
.iter_chunks(future, true)
.map(|rb| interop::arrow::to_py::to_py_rb(&rb, &names, py, &pyarrow))
.collect::<PyResult<_>>()?;
Ok(rbs)
Expand Down

0 comments on commit 60d5b35

Please sign in to comment.