Skip to content

Commit

Permalink
fix: Fix streaming glob slice (#16174)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored May 12, 2024
1 parent 1b97b6d commit 3892c75
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 96 deletions.
3 changes: 3 additions & 0 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2249,6 +2249,9 @@ impl DataFrame {
if offset == 0 && length == self.height() {
return self.clone();
}
if length == 0 {
return self.clear();
}
let col = self
.columns
.iter()
Expand Down
22 changes: 12 additions & 10 deletions crates/polars-io/src/csv/read/read_impl/batched_mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl<'a> CoreReader<'a> {
to_cast: self.to_cast,
ignore_errors: self.ignore_errors,
truncate_ragged_lines: self.truncate_ragged_lines,
n_rows: self.n_rows,
remaining: self.n_rows.unwrap_or(usize::MAX),
encoding: self.encoding,
separator: self.separator,
schema: self.schema,
Expand All @@ -197,7 +197,7 @@ pub struct BatchedCsvReaderMmap<'a> {
truncate_ragged_lines: bool,
to_cast: Vec<Field>,
ignore_errors: bool,
n_rows: Option<usize>,
remaining: usize,
encoding: CsvEncoding,
separator: u8,
schema: SchemaRef,
Expand All @@ -211,14 +211,9 @@ pub struct BatchedCsvReaderMmap<'a> {

impl<'a> BatchedCsvReaderMmap<'a> {
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
if n == 0 {
if n == 0 || self.remaining == 0 {
return Ok(None);
}
if let Some(n_rows) = self.n_rows {
if self.rows_read >= n_rows as IdxSize {
return Ok(None);
}
}

// get next `n` offset positions.
let file_chunks_iter = (&mut self.file_chunks_iter).take(n);
Expand Down Expand Up @@ -274,8 +269,15 @@ impl<'a> BatchedCsvReaderMmap<'a> {
if self.row_index.is_some() {
update_row_counts2(&mut chunks, self.rows_read)
}
for df in &chunks {
self.rows_read += df.height() as IdxSize;
for df in &mut chunks {
let h = df.height();

if self.remaining < h {
*df = df.slice(0, self.remaining)
};
self.remaining = self.remaining.saturating_sub(h);

self.rows_read += h as IdxSize;
}
Ok(Some(chunks))
}
Expand Down
28 changes: 15 additions & 13 deletions crates/polars-io/src/csv/read/read_impl/batched_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ impl<'a> CoreReader<'a> {

Ok(BatchedCsvReaderRead {
chunk_size: self.chunk_size,
finished: false,
file_chunk_reader: chunk_iter,
file_chunks: vec![],
projection,
Expand All @@ -260,20 +259,20 @@ impl<'a> CoreReader<'a> {
to_cast: self.to_cast,
ignore_errors: self.ignore_errors,
truncate_ragged_lines: self.truncate_ragged_lines,
n_rows: self.n_rows,
remaining: self.n_rows.unwrap_or(usize::MAX),
encoding: self.encoding,
separator: self.separator,
schema: self.schema,
rows_read: 0,
_cat_lock,
decimal_comma: self.decimal_comma,
finished: false,
})
}
}

pub struct BatchedCsvReaderRead<'a> {
chunk_size: usize,
finished: bool,
file_chunk_reader: ChunkReader<'a>,
file_chunks: Vec<(SyncPtr<u8>, usize)>,
projection: Vec<usize>,
Expand All @@ -287,7 +286,7 @@ pub struct BatchedCsvReaderRead<'a> {
to_cast: Vec<Field>,
ignore_errors: bool,
truncate_ragged_lines: bool,
n_rows: Option<usize>,
remaining: usize,
encoding: CsvEncoding,
separator: u8,
schema: SchemaRef,
Expand All @@ -297,19 +296,15 @@ pub struct BatchedCsvReaderRead<'a> {
#[cfg(not(feature = "dtype-categorical"))]
_cat_lock: Option<u8>,
decimal_comma: bool,
finished: bool,
}
//
impl<'a> BatchedCsvReaderRead<'a> {
/// `n` number of batches.
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
if n == 0 || self.finished {
if n == 0 || self.remaining == 0 || self.finished {
return Ok(None);
}
if let Some(n_rows) = self.n_rows {
if self.rows_read >= n_rows as IdxSize {
return Ok(None);
}
}

// get next `n` offset positions.

Expand All @@ -331,7 +326,7 @@ impl<'a> BatchedCsvReaderRead<'a> {
// get the final slice
self.file_chunks
.push(self.file_chunk_reader.get_buf_remaining());
self.finished = true
self.finished = true;
}

// depleted the offsets iterator, we are done as well.
Expand Down Expand Up @@ -380,8 +375,15 @@ impl<'a> BatchedCsvReaderRead<'a> {
if self.row_index.is_some() {
update_row_counts2(&mut chunks, self.rows_read)
}
for df in &chunks {
self.rows_read += df.height() as IdxSize;
for df in &mut chunks {
let h = df.height();

if self.remaining < h {
*df = df.slice(0, self.remaining)
};
self.remaining = self.remaining.saturating_sub(h);

self.rows_read += h as IdxSize;
}
Ok(Some(chunks))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@ fn jit_insert_slice(
sink_nodes: &mut Vec<(usize, Node, Rc<RefCell<u32>>)>,
operator_offset: usize,
) {
// if the join/union has a slice, we add a new slice node
// if the join has a slice, we add a new slice node
// note that we take the offset + 1, because we want to
// slice AFTER the join has happened and the join will be an
// operator
// NOTE: Don't do this for union, that doesn't work.
// TODO! Deal with this in the optimizer.
use IR::*;
let (offset, len) = match lp_arena.get(node) {
Join { options, .. } if options.args.slice.is_some() => {
Expand All @@ -80,19 +82,11 @@ fn jit_insert_slice(
};
(offset, len)
},
Union {
options:
UnionOptions {
slice: Some((offset, len)),
..
},
..
} => (*offset, *len),
_ => return,
};

let slice_node = lp_arena.add(Slice {
input: Node::default(),
input: node,
offset,
len: len as IdxSize,
});
Expand Down Expand Up @@ -178,7 +172,6 @@ pub(super) fn construct(
},
PipelineNode::Union(node) => {
operator_nodes.push(node);
jit_insert_slice(node, lp_arena, &mut sink_nodes, operator_offset);
let op = get_operator(node, lp_arena, expr_arena, &to_physical_piped_expr)?;
operators.push(op);
},
Expand Down
62 changes: 2 additions & 60 deletions crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,6 @@ fn insert_file_sink(mut root: Node, lp_arena: &mut Arena<IR>) -> Node {
root
}

fn insert_slice(
root: Node,
offset: i64,
len: IdxSize,
lp_arena: &mut Arena<IR>,
state: &mut Branch,
) {
let node = lp_arena.add(IR::Slice {
input: root,
offset,
len: len as IdxSize,
});
state.operators_sinks.push(PipelineNode::Sink(node));
}

pub(crate) fn insert_streaming_nodes(
root: Node,
lp_arena: &mut Arena<IR>,
Expand Down Expand Up @@ -244,20 +229,8 @@ pub(crate) fn insert_streaming_nodes(
)
}
},
Scan {
file_options: options,
scan_type,
..
} if scan_type.streamable() => {
Scan { scan_type, .. } if scan_type.streamable() => {
if state.streamable {
#[cfg(feature = "csv")]
if matches!(scan_type, FileScan::Csv { .. }) {
// the batched csv reader doesn't stop exactly at n_rows
if let Some(n_rows) = options.n_rows {
insert_slice(root, 0, n_rows as IdxSize, lp_arena, &mut state);
}
}

state.sources.push(root);
pipeline_trees[current_idx].push(state)
}
Expand Down Expand Up @@ -320,38 +293,7 @@ pub(crate) fn insert_streaming_nodes(
state.sources.push(root);
pipeline_trees[current_idx].push(state);
},
Union {
options:
UnionOptions {
slice: Some((offset, len)),
..
},
..
} if *offset >= 0 => {
insert_slice(root, *offset, *len as IdxSize, lp_arena, &mut state);
state.streamable = true;
let Union { inputs, .. } = lp_arena.get(root) else {
unreachable!()
};
for (i, input) in inputs.iter().enumerate() {
let mut state = if i == 0 {
// Note the clone!
let mut state = state.clone();
state.join_count += inputs.len() as u32 - 1;
state
} else {
let mut state = state.split_from_sink();
state.join_count = 0;
state
};
state.operators_sinks.push(PipelineNode::Union(root));
stack.push(StackFrame::new(*input, state, current_idx));
}
},
Union {
inputs,
options: UnionOptions { slice: None, .. },
} => {
Union { inputs, .. } => {
{
state.streamable = true;
for (i, input) in inputs.iter().enumerate() {
Expand Down
13 changes: 11 additions & 2 deletions crates/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,24 @@ impl SlicePushDown {
Ok(lp)
}
(Union {mut inputs, mut options }, Some(state)) => {
options.slice = Some((state.offset, state.len as usize));
if state.offset == 0 {
for input in &mut inputs {
let input_lp = lp_arena.take(*input);
let input_lp = self.pushdown(input_lp, Some(state), lp_arena, expr_arena)?;
lp_arena.replace(*input, input_lp);
}
}
Ok(Union {inputs, options})
// The in-memory union node is slice aware.
// We still set this information, but the streaming engine will ignore it.
options.slice = Some((state.offset, state.len as usize));
let lp = Union {inputs, options};

if self.streaming {
// Ensure the slice node remains.
self.no_pushdown_finish_opt(lp, Some(state), lp_arena)
} else {
Ok(lp)
}
},
(Join {
input_left,
Expand Down
7 changes: 7 additions & 0 deletions crates/polars-utils/src/arena.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ impl<T> Arena<T> {
}
}

impl<T: Clone> Arena<T> {
pub fn duplicate(&mut self, node: Node) -> Node {
let item = self.items[node.0].clone();
self.add(item)
}
}

impl<T: Default> Arena<T> {
#[inline]
pub fn take(&mut self, idx: Node) -> T {
Expand Down
5 changes: 5 additions & 0 deletions py-polars/tests/unit/streaming/test_streaming_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ def test_scan_slice_streaming(io_files_path: Path) -> None:
df = pl.scan_csv(foods_file_path).head(5).collect(streaming=True)
assert df.shape == (5, 4)

# globbing
foods_file_path = io_files_path / "foods*.csv"
df = pl.scan_csv(foods_file_path).head(5).collect(streaming=True)
assert df.shape == (5, 4)


@pytest.mark.parametrize("dtype", [pl.Int8, pl.UInt8, pl.Int16, pl.UInt16])
def test_scan_csv_overwrite_small_dtypes(
Expand Down

0 comments on commit 3892c75

Please sign in to comment.