diff --git a/Cargo.lock b/Cargo.lock index 2703f5de5a08..bb4f7a48cd60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3028,6 +3028,7 @@ version = "0.40.0" dependencies = [ "ahash", "bitflags 2.5.0", + "crossbeam-channel", "futures", "glob", "once_cell", @@ -3148,6 +3149,7 @@ dependencies = [ "chrono", "chrono-tz", "ciborium", + "crossbeam-channel", "either", "futures", "hashbrown", @@ -3324,6 +3326,7 @@ dependencies = [ "arboard", "built", "ciborium", + "crossbeam-channel", "either", "itoa", "jemallocator", diff --git a/crates/polars-lazy/Cargo.toml b/crates/polars-lazy/Cargo.toml index 054089ff404f..f5b9a795fa92 100644 --- a/crates/polars-lazy/Cargo.toml +++ b/crates/polars-lazy/Cargo.toml @@ -23,6 +23,7 @@ polars-utils = { workspace = true } ahash = { workspace = true } bitflags = { workspace = true } +crossbeam-channel = { workspace = true } glob = { version = "0.3" } once_cell = { workspace = true } pyo3 = { workspace = true, optional = true } diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 44155a040995..243fa3a3520e 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -19,6 +19,7 @@ use std::path::PathBuf; use std::sync::{Arc, Mutex}; pub use anonymous_scan::*; +use crossbeam_channel::{bounded, Receiver}; #[cfg(feature = "csv")] pub use csv::*; #[cfg(not(target_arch = "wasm32"))] @@ -31,6 +32,7 @@ pub use ndjson::*; #[cfg(feature = "parquet")] pub use parquet::*; use polars_core::prelude::*; +use polars_core::POOL; use polars_io::RowIndex; use polars_ops::frame::JoinCoalesce; pub use polars_plan::frame::{AllowedOptimizations, OptState}; @@ -816,6 +818,30 @@ impl LazyFrame { Ok(()) } + pub fn sink_to_batches(mut self) -> Result, PolarsError> { + self.opt_state.streaming = true; + let morsels_per_sink = POOL.current_num_threads(); + let backpressure = morsels_per_sink * 4; + let (sender, receiver) = bounded(backpressure); + self.logical_plan = DslPlan::Sink { + input: Arc::new(self.logical_plan), + payload: SinkType::Batch { + sender: BatchSender { id: 0, sender }, + }, + }; + + let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; + polars_ensure!( + is_streaming, + ComputeError: format!("cannot run the whole query in a streaming order") + ); + POOL.spawn(move || { + let _ = physical_plan.execute(&mut state).unwrap(); + }); + + Ok(receiver) + } + /// Filter by some predicate expression. /// /// The expression must yield boolean values. diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index 4e0e169847fe..b9176974630e 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -163,6 +163,9 @@ fn create_physical_plan_impl( SinkType::Memory => { polars_bail!(InvalidOperation: "memory sink not supported in the standard engine") }, + SinkType::Batch { .. } => { + polars_bail!(InvalidOperation: "batch sink not supported in the standard engine") + } SinkType::File { file_type, .. } => { polars_bail!(InvalidOperation: "sink_{file_type:?} not yet supported in standard engine. Use 'collect().write_parquet()'" diff --git a/crates/polars-pipe/src/executors/sinks/output/batch_sink.rs b/crates/polars-pipe/src/executors/sinks/output/batch_sink.rs new file mode 100644 index 000000000000..58fd05e037a5 --- /dev/null +++ b/crates/polars-pipe/src/executors/sinks/output/batch_sink.rs @@ -0,0 +1,51 @@ +use std::any::Any; + +use crossbeam_channel::Sender; +use polars_core::prelude::*; + +use crate::operators::{ + chunks_to_df_unchecked, DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult, +}; + +#[derive(Clone)] +pub struct BatchSink { + sender: Sender, +} + +impl BatchSink { + pub fn new(sender: Sender) -> PolarsResult { + Ok(Self { sender }) + } +} + +impl Sink for BatchSink { + fn sink(&mut self, _context: &PExecutionContext, chunk: DataChunk) -> PolarsResult { + let df: DataFrame = chunks_to_df_unchecked(vec![chunk]); + let result = self.sender.send(df); + match result { + Ok(..) => Ok(SinkResult::CanHaveMoreInput), + Err(..) => Ok(SinkResult::Finished), + } + } + + fn combine(&mut self, _other: &mut dyn Sink) { + // Nothing to do + } + + fn split(&self, _thread_no: usize) -> Box { + Box::new(self.clone()) + } + + fn finalize(&mut self, _context: &PExecutionContext) -> PolarsResult { + let _ = self.sender.send(Default::default()); + Ok(FinalizedSink::Finished(Default::default())) + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn fmt(&self) -> &str { + "batch_sink" + } +} diff --git a/crates/polars-pipe/src/executors/sinks/output/mod.rs b/crates/polars-pipe/src/executors/sinks/output/mod.rs index 602e525fa853..a2f8cc9b25f2 100644 --- a/crates/polars-pipe/src/executors/sinks/output/mod.rs +++ b/crates/polars-pipe/src/executors/sinks/output/mod.rs @@ -1,3 +1,4 @@ +mod batch_sink; #[cfg(feature = "csv")] mod csv; #[cfg(any( @@ -14,6 +15,7 @@ mod json; #[cfg(feature = "parquet")] mod parquet; +pub use batch_sink::*; #[cfg(feature = "csv")] pub use csv::*; #[cfg(feature = "ipc")] diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index 46d9482283b8..63ba66d92701 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -174,6 +174,9 @@ where SinkType::Memory => { Box::new(OrderedSink::new(input_schema.into_owned())) as Box }, + SinkType::Batch { sender } => { + Box::new(BatchSink::new(sender.sender.clone())?) as Box + }, #[allow(unused_variables)] SinkType::File { path, file_type, .. diff --git a/crates/polars-plan/Cargo.toml b/crates/polars-plan/Cargo.toml index 92113dc29b04..0142ba6bdca0 100644 --- a/crates/polars-plan/Cargo.toml +++ b/crates/polars-plan/Cargo.toml @@ -28,6 +28,7 @@ bytemuck = { workspace = true } chrono = { workspace = true, optional = true } chrono-tz = { workspace = true, optional = true } ciborium = { workspace = true, optional = true } +crossbeam-channel = { workspace = true } either = { workspace = true } futures = { workspace = true, optional = true } hashbrown = { workspace = true } diff --git a/crates/polars-plan/src/logical_plan/alp/dot.rs b/crates/polars-plan/src/logical_plan/alp/dot.rs index a0692b7ef9d6..a12f5e7d1e01 100644 --- a/crates/polars-plan/src/logical_plan/alp/dot.rs +++ b/crates/polars-plan/src/logical_plan/alp/dot.rs @@ -264,6 +264,7 @@ impl<'a> IRDotDisplay<'a> { f.write_str(match payload { SinkType::Memory => "SINK (MEMORY)", SinkType::File { .. } => "SINK (FILE)", + SinkType::Batch { .. } => "SINK (BATCH)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "SINK (CLOUD)", }) diff --git a/crates/polars-plan/src/logical_plan/alp/format.rs b/crates/polars-plan/src/logical_plan/alp/format.rs index f7debc9c7223..095f19fa99f6 100644 --- a/crates/polars-plan/src/logical_plan/alp/format.rs +++ b/crates/polars-plan/src/logical_plan/alp/format.rs @@ -314,6 +314,7 @@ impl<'a> IRDisplay<'a> { let name = match payload { SinkType::Memory => "SINK (memory)", SinkType::File { .. } => "SINK (file)", + SinkType::Batch { .. } => "SINK (batch)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "SINK (cloud)", }; diff --git a/crates/polars-plan/src/logical_plan/alp/schema.rs b/crates/polars-plan/src/logical_plan/alp/schema.rs index 6047fe6d5943..3fdfd020e1a3 100644 --- a/crates/polars-plan/src/logical_plan/alp/schema.rs +++ b/crates/polars-plan/src/logical_plan/alp/schema.rs @@ -37,6 +37,7 @@ impl IR { Sink { payload, .. } => match payload { SinkType::Memory => "sink (memory)", SinkType::File { .. } => "sink (file)", + SinkType::Batch { .. } => "sink (batch)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "sink (cloud)", }, diff --git a/crates/polars-plan/src/logical_plan/alp/tree_format.rs b/crates/polars-plan/src/logical_plan/alp/tree_format.rs index 7337a6c33201..50aa4de48436 100644 --- a/crates/polars-plan/src/logical_plan/alp/tree_format.rs +++ b/crates/polars-plan/src/logical_plan/alp/tree_format.rs @@ -339,6 +339,7 @@ impl<'a> TreeFmtNode<'a> { match payload { SinkType::Memory => "SINK (memory)", SinkType::File { .. } => "SINK (file)", + SinkType::Batch { .. } => "SINK (batch)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "SINK (cloud)", }, diff --git a/crates/polars-plan/src/logical_plan/options.rs b/crates/polars-plan/src/logical_plan/options.rs index 67d1e3a43985..04d86e817650 100644 --- a/crates/polars-plan/src/logical_plan/options.rs +++ b/crates/polars-plan/src/logical_plan/options.rs @@ -1,5 +1,7 @@ +use std::hash::{Hash, Hasher}; use std::path::PathBuf; +use crossbeam_channel::{bounded, Sender}; use polars_core::prelude::*; #[cfg(feature = "csv")] use polars_io::csv::write::CsvWriterOptions; @@ -223,10 +225,41 @@ pub struct AnonymousScanOptions { pub fmt_str: &'static str, } +#[derive(Clone, Debug)] +pub struct BatchSender { + pub id: u32, + pub sender: Sender, +} + +impl Default for BatchSender { + fn default() -> Self { + let (sender, _receiver) = bounded(1); + Self { id: 0, sender } + } +} + +impl PartialEq for BatchSender { + fn eq(&self, other: &Self) -> bool { + self.sender.same_channel(&other.sender) + } +} + +impl Eq for BatchSender {} + +impl Hash for BatchSender { + fn hash(&self, state: &mut H) { + self.id.hash(state) + } +} + #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum SinkType { Memory, + Batch { + #[cfg_attr(feature = "serde", serde(skip))] + sender: BatchSender, + }, File { path: Arc, file_type: FileType, diff --git a/py-polars/Cargo.toml b/py-polars/Cargo.toml index 19433071b671..c9609a6d7fb2 100644 --- a/py-polars/Cargo.toml +++ b/py-polars/Cargo.toml @@ -20,6 +20,7 @@ polars-utils = { workspace = true } ahash = { workspace = true } arboard = { workspace = true, optional = true } ciborium = { workspace = true } +crossbeam-channel = { workspace = true } either = { workspace = true } itoa = { workspace = true } libc = "0.2" diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 2bc89e407c30..df77fc4f822e 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -14,6 +14,7 @@ Callable, ClassVar, Collection, + Generator, Iterable, Mapping, NoReturn, @@ -2101,6 +2102,40 @@ def collect_async( ldf.collect_with_callback(result._callback) # type: ignore[attr-defined] return result # type: ignore[return-value] + @unstable() + def collect_batches( + self, + *, + type_coercion: bool = True, + predicate_pushdown: bool = True, + projection_pushdown: bool = True, + simplify_expression: bool = True, + slice_pushdown: bool = True, + comm_subplan_elim: bool = True, + comm_subexpr_elim: bool = True, + cluster_with_columns: bool = True, + no_optimization: bool = False, + streaming: bool = True, + _eager: bool = False, + ) -> Generator[DataFrame, None, None]: + """Collect in batches.""" + ldf = self._ldf.optimization_toggle( + type_coercion, + predicate_pushdown, + projection_pushdown, + simplify_expression, + slice_pushdown, + comm_subplan_elim, + comm_subexpr_elim, + cluster_with_columns, + streaming, + _eager, + ) + df_iter = ldf.sink_to_batches() + for df in df_iter: + yield wrap_df(df) + + @unstable() def sink_parquet( self, diff --git a/py-polars/src/lazyframe/data_frame_iter.rs b/py-polars/src/lazyframe/data_frame_iter.rs new file mode 100644 index 000000000000..bc026deda0d0 --- /dev/null +++ b/py-polars/src/lazyframe/data_frame_iter.rs @@ -0,0 +1,38 @@ +use crossbeam_channel::Receiver; +use polars::frame::DataFrame; +use pyo3::prelude::*; + +use crate::dataframe::PyDataFrame; + +#[pyclass] +pub struct DataFrameIter { + pub df_receiver: Receiver, + pub limit: Option, + pub num_rows: usize, +} + +#[pymethods] +impl DataFrameIter { + fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { + slf + } + + fn __next__(mut slf: PyRefMut<'_, Self>) -> Option { + if slf.limit.is_some() && slf.num_rows >= slf.limit.unwrap() { + return None; + } + let mut df = match slf.df_receiver.recv() { + Ok(df) => df, + Err(e) => { + return None; + } + }; + + if slf.limit.is_some() && slf.limit.unwrap() - slf.num_rows < df.height() { + let limit = slf.limit.unwrap() - slf.num_rows; + df = df.head(Some(limit)); + } + slf.num_rows += df.height(); + Some(df.into()) + } +} diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index 7aa96c9640a9..ea804d072a71 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -1,6 +1,7 @@ mod exitable; mod visit; pub(crate) mod visitor; +mod data_frame_iter; use std::collections::HashMap; use std::io::BufWriter; use std::num::NonZeroUsize; @@ -21,6 +22,7 @@ use crate::error::PyPolarsErr; use crate::expr::ToExprs; use crate::file::get_file_like; use crate::interop::arrow::to_rust::pyarrow_schema_to_rust; +use crate::lazyframe::data_frame_iter::DataFrameIter; use crate::lazyframe::visit::NodeTraverser; use crate::prelude::*; use crate::{PyDataFrame, PyExpr, PyLazyGroupBy}; @@ -651,6 +653,26 @@ impl PyLazyFrame { }); } + #[cfg(all(feature = "streaming"))] + fn sink_to_batches( + &self, + py: Python, + limit: Option, + ) -> PyResult { + // if we don't allow threads and we have udfs trying to acquire the gil from different + // threads we deadlock. + let df_receiver = py.allow_threads(|| { + let ldf = self.ldf.clone(); + ldf.sink_to_batches().map_err(PyPolarsErr::from) + })?; + let df_iter = DataFrameIter{ + df_receiver, + limit, + num_rows: 0 + }; + Ok(df_iter) + } + #[cfg(all(feature = "streaming", feature = "parquet"))] #[pyo3(signature = (path, compression, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order))] fn sink_parquet( diff --git a/py-polars/tests/unit/streaming/test_streaming_batches.py b/py-polars/tests/unit/streaming/test_streaming_batches.py new file mode 100644 index 000000000000..3cb1c50c9496 --- /dev/null +++ b/py-polars/tests/unit/streaming/test_streaming_batches.py @@ -0,0 +1,11 @@ +import polars as pl + + +def test_stream_data_in_batches(): + df = pl.DataFrame({"col_1": [0] * 5 + [1] * 5}) + + dfs = list(df.lazy().collect_batches( + streaming=True + )) + + assert len(dfs) == 11