Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Remove 'FileCacher' optimization #15357

Merged
merged 1 commit into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 2 additions & 34 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,38 +633,12 @@ impl LazyFrame {
mut self,
check_sink: bool,
) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
let file_caching = self.opt_state.file_caching && !self.opt_state.streaming;
let mut expr_arena = Arena::with_capacity(256);
let mut lp_arena = Arena::with_capacity(128);
let mut scratch = vec![];
let lp_top =
self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?;

let finger_prints = if file_caching {
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
{
let mut fps = Vec::with_capacity(8);
collect_fingerprints(lp_top, &mut fps, &lp_arena, &expr_arena);
Some(fps)
}
#[cfg(not(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
)))]
{
None
}
} else {
None
};

// sink should be replaced
let no_file_sink = if check_sink {
!matches!(lp_arena.get(lp_top), ALogicalPlan::Sink { .. })
Expand All @@ -673,7 +647,7 @@ impl LazyFrame {
};
let physical_plan = create_physical_plan(lp_top, &mut lp_arena, &mut expr_arena)?;

let state = ExecutionState::with_finger_prints(finger_prints);
let state = ExecutionState::new();
Ok((state, physical_plan, no_file_sink))
}

Expand All @@ -696,13 +670,7 @@ impl LazyFrame {
/// ```
pub fn collect(self) -> PolarsResult<DataFrame> {
let (mut state, mut physical_plan, _) = self.prepare_collect(false)?;
let out = physical_plan.execute(&mut state);
#[cfg(debug_assertions)]
{
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
state.file_cache.assert_empty();
}
out
physical_plan.execute(&mut state)
}

/// Profile a LazyFrame.
Expand Down
21 changes: 1 addition & 20 deletions crates/polars-lazy/src/physical_plan/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,6 @@ impl CsvExec {

impl Executor for CsvExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
#[allow(clippy::useless_asref)]
let finger_print = FileFingerPrint {
paths: Arc::new([self.path.clone()]),
predicate: self
.predicate
.as_ref()
.map(|ae| ae.as_expression().unwrap().clone()),
slice: (self.options.skip_rows, self.file_options.n_rows),
};

let profile_name = if state.has_node_timer() {
let mut ids = vec![self.path.to_string_lossy().into()];
if self.predicate.is_some() {
Expand All @@ -73,15 +63,6 @@ impl Executor for CsvExec {
Cow::Borrowed("")
};

state.record(
|| {
state
.file_cache
.read(finger_print, self.file_options.file_counter, &mut || {
self.read()
})
},
profile_name,
)
state.record(|| self.read(), profile_name)
}
}
21 changes: 1 addition & 20 deletions crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,16 +268,6 @@ fn finish_index_and_dfs(

impl Executor for IpcExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
let finger_print = FileFingerPrint {
paths: Arc::clone(&self.paths),
#[allow(clippy::useless_asref)]
predicate: self
.predicate
.as_ref()
.map(|ae| ae.as_expression().unwrap().clone()),
slice: (0, self.file_options.n_rows),
};

let profile_name = if state.has_node_timer() {
let mut ids = vec![self.paths[0].to_string_lossy().into()];
if self.predicate.is_some() {
Expand All @@ -289,15 +279,6 @@ impl Executor for IpcExec {
Cow::Borrowed("")
};

state.record(
|| {
state
.file_cache
.read(finger_print, self.file_options.file_counter, &mut || {
self.read(state.verbose())
})
},
profile_name,
)
state.record(|| self.read(state.verbose()), profile_name)
}
}
2 changes: 0 additions & 2 deletions crates/polars-lazy/src/physical_plan/executors/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ use polars_io::predicates::PhysicalIoExpr;
#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc", feature = "cse"))]
use polars_io::prelude::*;
use polars_plan::global::_set_n_rows_for_scan;
#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc", feature = "cse"))]
use polars_plan::logical_plan::FileFingerPrint;
#[cfg(feature = "ipc")]
pub(crate) use support::ConsecutiveCountState;

Expand Down
21 changes: 1 addition & 20 deletions crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,16 +379,6 @@ impl ParquetExec {

impl Executor for ParquetExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
let finger_print = FileFingerPrint {
paths: self.paths.clone(),
#[allow(clippy::useless_asref)]
predicate: self
.predicate
.as_ref()
.map(|ae| ae.as_expression().unwrap().clone()),
slice: (0, self.file_options.n_rows),
};

let profile_name = if state.has_node_timer() {
let mut ids = vec![self.paths[0].to_string_lossy().into()];
if self.predicate.is_some() {
Expand All @@ -400,15 +390,6 @@ impl Executor for ParquetExec {
Cow::Borrowed("")
};

state.record(
|| {
state
.file_cache
.read(finger_print, self.file_options.file_counter, &mut || {
self.read()
})
},
profile_name,
)
state.record(|| self.read(), profile_name)
}
}
68 changes: 0 additions & 68 deletions crates/polars-lazy/src/physical_plan/file_cache.rs

This file was deleted.

7 changes: 0 additions & 7 deletions crates/polars-lazy/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@ pub mod executors;
#[cfg(any(feature = "list_eval", feature = "pivot"))]
pub(crate) mod exotic;
pub mod expressions;
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
mod file_cache;
mod node_timer;
pub mod planner;
pub(crate) mod state;
Expand Down
66 changes: 1 addition & 65 deletions crates/polars-lazy/src/physical_plan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,7 @@ use once_cell::sync::OnceCell;
use polars_core::config::verbose;
use polars_core::prelude::*;
use polars_ops::prelude::ChunkJoinOptIds;
#[cfg(any(
feature = "parquet",
feature = "csv",
feature = "ipc",
feature = "json"
))]
use polars_plan::logical_plan::FileFingerPrint;

#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
use super::file_cache::FileCache;

use crate::physical_plan::node_timer::NodeTimer;

pub type JoinTuplesCache = Arc<Mutex<PlHashMap<String, ChunkJoinOptIds>>>;
Expand Down Expand Up @@ -75,14 +61,6 @@ type CachedValue = Arc<(AtomicI64, OnceCell<DataFrame>)>;
pub struct ExecutionState {
// cached by a `.cache` call and kept in memory for the duration of the plan.
df_cache: Arc<Mutex<PlHashMap<usize, CachedValue>>>,
// cache file reads until all branches got there file, then we delete it
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
pub(crate) file_cache: FileCache,
pub(super) schema_cache: RwLock<Option<SchemaRef>>,
/// Used by Window Expression to prevent redundant grouping
pub(super) group_tuples: GroupsProxyCache,
Expand All @@ -105,13 +83,6 @@ impl ExecutionState {
Self {
df_cache: Default::default(),
schema_cache: Default::default(),
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: FileCache::new(None),
group_tuples: Default::default(),
join_tuples: Default::default(),
branch_idx: 0,
Expand Down Expand Up @@ -163,13 +134,6 @@ impl ExecutionState {
pub(super) fn split(&self) -> Self {
Self {
df_cache: self.df_cache.clone(),
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: self.file_cache.clone(),
schema_cache: Default::default(),
group_tuples: Default::default(),
join_tuples: Default::default(),
Expand All @@ -185,13 +149,6 @@ impl ExecutionState {
pub(super) fn clone(&self) -> Self {
Self {
df_cache: self.df_cache.clone(),
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: self.file_cache.clone(),
schema_cache: self.schema_cache.read().unwrap().clone().into(),
group_tuples: self.group_tuples.clone(),
join_tuples: self.join_tuples.clone(),
Expand All @@ -203,27 +160,6 @@ impl ExecutionState {
}
}

#[cfg(not(any(
feature = "parquet",
feature = "csv",
feature = "ipc",
feature = "json"
)))]
pub(crate) fn with_finger_prints(_finger_prints: Option<usize>) -> Self {
Self::new()
}
#[cfg(any(
feature = "parquet",
feature = "csv",
feature = "ipc",
feature = "json"
))]
pub(crate) fn with_finger_prints(finger_prints: Option<Vec<FileFingerPrint>>) -> Self {
let mut new = Self::new();
new.file_cache = FileCache::new(finger_prints);
new
}

pub(crate) fn set_schema(&self, schema: SchemaRef) {
let mut lock = self.schema_cache.write().unwrap();
*lock = Some(schema);
Expand Down
10 changes: 0 additions & 10 deletions crates/polars-plan/src/logical_plan/file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,6 @@ impl FileScan {
}
}

#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "cse"))]
pub(crate) fn skip_rows(&self) -> usize {
#[allow(unreachable_patterns)]
match self {
#[cfg(feature = "csv")]
Self::Csv { options } => options.skip_rows,
_ => 0,
}
}

pub(crate) fn sort_projection(&self, _file_options: &FileScanOptions) -> bool {
match self {
#[cfg(feature = "csv")]
Expand Down
8 changes: 0 additions & 8 deletions crates/polars-plan/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,6 @@ use serde::{Deserialize, Serialize};
use strum_macros::IntoStaticStr;

use self::tree_format::{TreeFmtNode, TreeFmtVisitor};
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "cse",
feature = "json"
))]
pub use crate::logical_plan::optimizer::file_caching::FileFingerPrint;

pub type ColumnName = Arc<str>;

Expand Down
Loading
Loading