Skip to content

Commit

Permalink
serve results of a lazy df in a generator
Browse files Browse the repository at this point in the history
  • Loading branch information
Boruch Chalk committed May 31, 2024
1 parent d7b4f72 commit 5b8d884
Show file tree
Hide file tree
Showing 18 changed files with 234 additions and 0 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ polars-utils = { workspace = true }

ahash = { workspace = true }
bitflags = { workspace = true }
crossbeam-channel = { workspace = true }
glob = { version = "0.3" }
once_cell = { workspace = true }
pyo3 = { workspace = true, optional = true }
Expand Down
26 changes: 26 additions & 0 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::path::PathBuf;
use std::sync::{Arc, Mutex};

pub use anonymous_scan::*;
use crossbeam_channel::{bounded, Receiver};
#[cfg(feature = "csv")]
pub use csv::*;
#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -31,6 +32,7 @@ pub use ndjson::*;
#[cfg(feature = "parquet")]
pub use parquet::*;
use polars_core::prelude::*;
use polars_core::POOL;
use polars_io::RowIndex;
use polars_ops::frame::JoinCoalesce;
pub use polars_plan::frame::{AllowedOptimizations, OptState};
Expand Down Expand Up @@ -816,6 +818,30 @@ impl LazyFrame {
Ok(())
}

pub fn sink_to_batches(mut self) -> Result<Receiver<DataFrame>, PolarsError> {
self.opt_state.streaming = true;
let morsels_per_sink = POOL.current_num_threads();
let backpressure = morsels_per_sink * 4;
let (sender, receiver) = bounded(backpressure);
self.logical_plan = DslPlan::Sink {
input: Arc::new(self.logical_plan),
payload: SinkType::Batch {
sender: BatchSender { id: 0, sender },
},
};

let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
polars_ensure!(
is_streaming,
ComputeError: format!("cannot run the whole query in a streaming order")
);
POOL.spawn(move || {
let _ = physical_plan.execute(&mut state).unwrap();
});

Ok(receiver)
}

/// Filter by some predicate expression.
///
/// The expression must yield boolean values.
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ fn create_physical_plan_impl(
SinkType::Memory => {
polars_bail!(InvalidOperation: "memory sink not supported in the standard engine")
},
SinkType::Batch { .. } => {
polars_bail!(InvalidOperation: "batch sink not supported in the standard engine")
}
SinkType::File { file_type, .. } => {
polars_bail!(InvalidOperation:
"sink_{file_type:?} not yet supported in standard engine. Use 'collect().write_parquet()'"
Expand Down
51 changes: 51 additions & 0 deletions crates/polars-pipe/src/executors/sinks/output/batch_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::any::Any;

use crossbeam_channel::Sender;
use polars_core::prelude::*;

use crate::operators::{
chunks_to_df_unchecked, DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult,
};

#[derive(Clone)]
pub struct BatchSink {
sender: Sender<DataFrame>,
}

impl BatchSink {
pub fn new(sender: Sender<DataFrame>) -> PolarsResult<Self> {
Ok(Self { sender })
}
}

impl Sink for BatchSink {
fn sink(&mut self, _context: &PExecutionContext, chunk: DataChunk) -> PolarsResult<SinkResult> {
let df: DataFrame = chunks_to_df_unchecked(vec![chunk]);
let result = self.sender.send(df);
match result {
Ok(..) => Ok(SinkResult::CanHaveMoreInput),
Err(..) => Ok(SinkResult::Finished),
}
}

fn combine(&mut self, _other: &mut dyn Sink) {
// Nothing to do
}

fn split(&self, _thread_no: usize) -> Box<dyn Sink> {
Box::new(self.clone())
}

fn finalize(&mut self, _context: &PExecutionContext) -> PolarsResult<FinalizedSink> {
let _ = self.sender.send(Default::default());
Ok(FinalizedSink::Finished(Default::default()))
}

fn as_any(&mut self) -> &mut dyn Any {
self
}

fn fmt(&self) -> &str {
"batch_sink"
}
}
2 changes: 2 additions & 0 deletions crates/polars-pipe/src/executors/sinks/output/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod batch_sink;
#[cfg(feature = "csv")]
mod csv;
#[cfg(any(
Expand All @@ -14,6 +15,7 @@ mod json;
#[cfg(feature = "parquet")]
mod parquet;

pub use batch_sink::*;
#[cfg(feature = "csv")]
pub use csv::*;
#[cfg(feature = "ipc")]
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ where
SinkType::Memory => {
Box::new(OrderedSink::new(input_schema.into_owned())) as Box<dyn SinkTrait>
},
SinkType::Batch { sender } => {
Box::new(BatchSink::new(sender.sender.clone())?) as Box<dyn SinkTrait>
},
#[allow(unused_variables)]
SinkType::File {
path, file_type, ..
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ bytemuck = { workspace = true }
chrono = { workspace = true, optional = true }
chrono-tz = { workspace = true, optional = true }
ciborium = { workspace = true, optional = true }
crossbeam-channel = { workspace = true }
either = { workspace = true }
futures = { workspace = true, optional = true }
hashbrown = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/logical_plan/alp/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ impl<'a> IRDotDisplay<'a> {
f.write_str(match payload {
SinkType::Memory => "SINK (MEMORY)",
SinkType::File { .. } => "SINK (FILE)",
SinkType::Batch { .. } => "SINK (BATCH)",
#[cfg(feature = "cloud")]
SinkType::Cloud { .. } => "SINK (CLOUD)",
})
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/logical_plan/alp/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ impl<'a> IRDisplay<'a> {
let name = match payload {
SinkType::Memory => "SINK (memory)",
SinkType::File { .. } => "SINK (file)",
SinkType::Batch { .. } => "SINK (batch)",
#[cfg(feature = "cloud")]
SinkType::Cloud { .. } => "SINK (cloud)",
};
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/logical_plan/alp/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl IR {
Sink { payload, .. } => match payload {
SinkType::Memory => "sink (memory)",
SinkType::File { .. } => "sink (file)",
SinkType::Batch { .. } => "sink (batch)",
#[cfg(feature = "cloud")]
SinkType::Cloud { .. } => "sink (cloud)",
},
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/logical_plan/alp/tree_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ impl<'a> TreeFmtNode<'a> {
match payload {
SinkType::Memory => "SINK (memory)",
SinkType::File { .. } => "SINK (file)",
SinkType::Batch { .. } => "SINK (batch)",
#[cfg(feature = "cloud")]
SinkType::Cloud { .. } => "SINK (cloud)",
},
Expand Down
33 changes: 33 additions & 0 deletions crates/polars-plan/src/logical_plan/options.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::hash::{Hash, Hasher};
use std::path::PathBuf;

use crossbeam_channel::{bounded, Sender};
use polars_core::prelude::*;
#[cfg(feature = "csv")]
use polars_io::csv::write::CsvWriterOptions;
Expand Down Expand Up @@ -223,10 +225,41 @@ pub struct AnonymousScanOptions {
pub fmt_str: &'static str,
}

#[derive(Clone, Debug)]
pub struct BatchSender {
pub id: u32,
pub sender: Sender<DataFrame>,
}

impl Default for BatchSender {
fn default() -> Self {
let (sender, _receiver) = bounded(1);
Self { id: 0, sender }
}
}

impl PartialEq for BatchSender {
fn eq(&self, other: &Self) -> bool {
self.sender.same_channel(&other.sender)
}
}

impl Eq for BatchSender {}

impl Hash for BatchSender {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state)
}
}

#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum SinkType {
Memory,
Batch {
#[cfg_attr(feature = "serde", serde(skip))]
sender: BatchSender,
},
File {
path: Arc<PathBuf>,
file_type: FileType,
Expand Down
1 change: 1 addition & 0 deletions py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ polars-utils = { workspace = true }
ahash = { workspace = true }
arboard = { workspace = true, optional = true }
ciborium = { workspace = true }
crossbeam-channel = { workspace = true }
either = { workspace = true }
itoa = { workspace = true }
libc = "0.2"
Expand Down
35 changes: 35 additions & 0 deletions py-polars/polars/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Callable,
ClassVar,
Collection,
Generator,
Iterable,
Mapping,
NoReturn,
Expand Down Expand Up @@ -2101,6 +2102,40 @@ def collect_async(
ldf.collect_with_callback(result._callback) # type: ignore[attr-defined]
return result # type: ignore[return-value]

@unstable()
def collect_batches(
self,
*,
type_coercion: bool = True,
predicate_pushdown: bool = True,
projection_pushdown: bool = True,
simplify_expression: bool = True,
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
cluster_with_columns: bool = True,
no_optimization: bool = False,
streaming: bool = True,
_eager: bool = False,
) -> Generator[DataFrame, None, None]:
"""Collect in batches."""
ldf = self._ldf.optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
simplify_expression,
slice_pushdown,
comm_subplan_elim,
comm_subexpr_elim,
cluster_with_columns,
streaming,
_eager,
)
df_iter = ldf.sink_to_batches()
for df in df_iter:
yield wrap_df(df)


@unstable()
def sink_parquet(
self,
Expand Down
38 changes: 38 additions & 0 deletions py-polars/src/lazyframe/data_frame_iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use crossbeam_channel::Receiver;
use polars::frame::DataFrame;
use pyo3::prelude::*;

use crate::dataframe::PyDataFrame;

#[pyclass]
pub struct DataFrameIter {
pub df_receiver: Receiver<DataFrame>,
pub limit: Option<usize>,
pub num_rows: usize,
}

#[pymethods]
impl DataFrameIter {
fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}

fn __next__(mut slf: PyRefMut<'_, Self>) -> Option<PyDataFrame> {
if slf.limit.is_some() && slf.num_rows >= slf.limit.unwrap() {
return None;
}
let mut df = match slf.df_receiver.recv() {
Ok(df) => df,
Err(e) => {
return None;
}
};

if slf.limit.is_some() && slf.limit.unwrap() - slf.num_rows < df.height() {
let limit = slf.limit.unwrap() - slf.num_rows;
df = df.head(Some(limit));
}
slf.num_rows += df.height();
Some(df.into())
}
}
22 changes: 22 additions & 0 deletions py-polars/src/lazyframe/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod exitable;
mod visit;
pub(crate) mod visitor;
mod data_frame_iter;
use std::collections::HashMap;
use std::io::BufWriter;
use std::num::NonZeroUsize;
Expand All @@ -21,6 +22,7 @@ use crate::error::PyPolarsErr;
use crate::expr::ToExprs;
use crate::file::get_file_like;
use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
use crate::lazyframe::data_frame_iter::DataFrameIter;
use crate::lazyframe::visit::NodeTraverser;
use crate::prelude::*;
use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
Expand Down Expand Up @@ -651,6 +653,26 @@ impl PyLazyFrame {
});
}

#[cfg(all(feature = "streaming"))]
fn sink_to_batches(
&self,
py: Python,
limit: Option<usize>,
) -> PyResult<DataFrameIter> {
// if we don't allow threads and we have udfs trying to acquire the gil from different
// threads we deadlock.
let df_receiver = py.allow_threads(|| {
let ldf = self.ldf.clone();
ldf.sink_to_batches().map_err(PyPolarsErr::from)
})?;
let df_iter = DataFrameIter{
df_receiver,
limit,
num_rows: 0
};
Ok(df_iter)
}

#[cfg(all(feature = "streaming", feature = "parquet"))]
#[pyo3(signature = (path, compression, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order))]
fn sink_parquet(
Expand Down
Loading

0 comments on commit 5b8d884

Please sign in to comment.