Skip to content

Commit

Permalink
avoid use of rewrite_projections
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-beedie committed Jun 23, 2024
1 parent 2d2e9c4 commit dd6b2c4
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 171 deletions.
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/conversion/expr_expansion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ fn find_flags(expr: &Expr) -> PolarsResult<ExpansionFlags> {

/// In case of single col(*) -> do nothing, no selection is the same as select all
/// In other cases replace the wildcard with an expression with all columns
pub fn rewrite_projections(
pub(crate) fn rewrite_projections(
exprs: Vec<Expr>,
schema: &Schema,
keys: &[Expr],
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/conversion/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod convert_utils;
mod dsl_to_ir;
pub(crate) mod expr_expansion;
mod expr_expansion;
mod expr_to_ir;
mod ir_to_dsl;
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) mod ir;
mod apply;
mod builder_dsl;
mod builder_ir;
pub mod conversion;
pub(crate) mod conversion;
#[cfg(feature = "debugging")]
pub(crate) mod debug;
pub mod expr_ir;
Expand Down
1 change: 0 additions & 1 deletion crates/polars-plan/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ pub(crate) use polars_time::prelude::*;
pub use polars_utils::arena::{Arena, Node};

pub use crate::dsl::*;
pub use crate::plans::conversion::expr_expansion::rewrite_projections;
#[cfg(feature = "debugging")]
pub use crate::plans::debug::*;
pub use crate::plans::options::*;
Expand Down
178 changes: 71 additions & 107 deletions crates/polars-sql/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use sqlparser::parser::{Parser, ParserOptions};

use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
use crate::sql_expr::{
parse_sql_array, parse_sql_expr, process_join_constraint, to_sql_interface_err,
parse_sql_array, parse_sql_expr, process_join_constraint, resolve_compound_identifier,
to_sql_interface_err,
};
use crate::table_functions::PolarsTableFunctions;

Expand Down Expand Up @@ -602,43 +603,43 @@ impl SQLContext {
}
self.execute_from_statement(from.first().unwrap())?
};
let mut contains_wildcard = false;
let mut contains_wildcard_exclude = false;

// Filter expression.
let schema = Some(lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?);
let schema = lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?;
lf = self.process_where(lf, &select_stmt.selection)?;

// Column projections.
let mut excluded_cols = Vec::new();
let projections: Vec<Expr> = select_stmt
.projection
.iter()
.map(|select_item| {
Ok(match select_item {
SelectItem::UnnamedExpr(expr) => {
vec![parse_sql_expr(expr, self, schema.as_deref())?]
vec![parse_sql_expr(expr, self, Some(schema.deref()))?]
},
SelectItem::ExprWithAlias { expr, alias } => {
let expr = parse_sql_expr(expr, self, schema.as_deref())?;
let expr = parse_sql_expr(expr, self, Some(schema.deref()))?;
vec![expr.alias(&alias.value)]
},
SelectItem::QualifiedWildcard(obj_name, wildcard_options) => {
let expanded = self.process_qualified_wildcard(
SelectItem::QualifiedWildcard(obj_name, wildcard_options) => self
.process_qualified_wildcard(
obj_name,
wildcard_options,
&mut contains_wildcard_exclude,
schema.as_deref(),
)?;
rewrite_projections(vec![expanded], &(schema.clone().unwrap()), &[])?
},
&mut excluded_cols,
Some(schema.deref()),
)?,
SelectItem::Wildcard(wildcard_options) => {
contains_wildcard = true;
let e = col("*");
vec![self.process_wildcard_additional_options(
e,
let cols = schema
.iter_names()
.map(|name| col(name))
.collect::<Vec<_>>();

self.process_wildcard_additional_options(
cols,
wildcard_options,
&mut contains_wildcard_exclude,
)?]
&mut excluded_cols,
)?
},
})
})
Expand All @@ -656,7 +657,13 @@ impl SQLContext {
group_by_keys = group_by_exprs
.iter()
.map(|e| {
self.expr_or_ordinal(e, &projections, None, schema.as_deref(), "GROUP BY")
self.expr_or_ordinal(
e,
&projections,
None,
Some(schema.deref()),
"GROUP BY",
)
})
.collect::<PolarsResult<_>>()?
},
Expand Down Expand Up @@ -689,73 +696,37 @@ impl SQLContext {
};

lf = if group_by_keys.is_empty() {
if query.order_by.is_empty() {
lf = if query.order_by.is_empty() {
// No sort, select cols as given
lf.select(projections)
} else if !contains_wildcard {
let mut retained_names = PlIndexSet::with_capacity(projections.len());
let schema = lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?;
} else {
// Add all projections to the base frame as any of
// the original columns may be required for the sort
lf = lf.with_columns(projections.clone());

projections.iter().for_each(|expr| match expr {
Expr::Alias(_, name) => {
retained_names.insert(name.clone());
},
Expr::Column(name) => {
retained_names.insert(name.clone());
},
Expr::Columns(names) => names.iter().for_each(|name| {
retained_names.insert(name.clone());
}),
Expr::Exclude(inner_expr, excludes) => {
if let Expr::Columns(names) = (*inner_expr).as_ref() {
names.iter().for_each(|name| {
retained_names.insert(name.clone());
})
}
excludes.iter().for_each(|excluded| {
if let Excluded::Name(name) = excluded {
retained_names.shift_remove(name);
}
})
},
_ => {
let field = expr.to_field(&schema, Context::Default).unwrap();
retained_names.insert(ColumnName::from(field.name.as_str()));
},
});
let retained_columns: Vec<_> =
retained_names.into_iter().map(|name| col(&name)).collect();

lf = lf.with_columns(projections);
lf = self.process_order_by(lf, &query.order_by, Some(retained_columns.as_ref()))?;
lf.select(&retained_columns)
} else if contains_wildcard_exclude {
let mut dropped_names = Vec::with_capacity(projections.len());
let exclude_expr = projections.iter().find(|expr| {
if let Expr::Exclude(_, excludes) = expr {
for excluded in excludes.iter() {
if let Excluded::Name(name) = excluded {
dropped_names.push(name.to_string());
}
}
true
} else {
false
}
});
if exclude_expr.is_some() {
lf = lf.with_columns(projections);
lf = self.process_order_by(lf, &query.order_by, None)?;
lf.drop(dropped_names)
} else {
lf = lf.select(projections);
self.process_order_by(lf, &query.order_by, None)?
}
// Final/selected cols (also ensures accurate ordinal position refs)
let retained_cols = projections
.iter()
.map(|e| {
col(e
.to_field(schema.deref(), Context::Default)
.unwrap()
.name
.as_str())
})
.collect::<Vec<_>>();

lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
lf.select(retained_cols)
};
// Discard any excluded cols
if !excluded_cols.is_empty() {
lf.drop(excluded_cols)
} else {
lf = lf.select(projections);
self.process_order_by(lf, &query.order_by, None)?
lf
}
} else {
lf = self.process_group_by(lf, contains_wildcard, &group_by_keys, &projections)?;
lf = self.process_group_by(lf, &group_by_keys, &projections)?;
lf = self.process_order_by(lf, &query.order_by, None)?;

// Apply optional 'having' clause, post-aggregation.
Expand Down Expand Up @@ -784,7 +755,7 @@ impl SQLContext {
})
.collect::<PolarsResult<Vec<_>>>()?;

// DISTINCT ON applies the ORDER BY before the operation.
// DISTINCT ON has to apply the ORDER BY before the operation.
if !query.order_by.is_empty() {
lf = self.process_order_by(lf, &query.order_by, None)?;
}
Expand Down Expand Up @@ -1057,14 +1028,9 @@ impl SQLContext {
fn process_group_by(
&mut self,
mut lf: LazyFrame,
contains_wildcard: bool,
group_by_keys: &[Expr],
projections: &[Expr],
) -> PolarsResult<LazyFrame> {
polars_ensure!(
!contains_wildcard,
SQLSyntax: "GROUP BY error (cannot process wildcard in group_by)"
);
let schema_before = lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?;
let group_by_keys_schema =
expressions_to_schema(group_by_keys, &schema_before, Context::Default)?;
Expand Down Expand Up @@ -1193,22 +1159,22 @@ impl SQLContext {
&mut self,
ObjectName(idents): &ObjectName,
options: &WildcardAdditionalOptions,
contains_wildcard_exclude: &mut bool,
excluded_cols: &mut Vec<String>,
schema: Option<&Schema>,
) -> PolarsResult<Expr> {
) -> PolarsResult<Vec<Expr>> {
let mut new_idents = idents.clone();
new_idents.push(Ident::new("*"));
let identifier = SQLExpr::CompoundIdentifier(new_idents);
let expr = parse_sql_expr(&identifier, self, schema)?;
self.process_wildcard_additional_options(expr, options, contains_wildcard_exclude)

let expr = resolve_compound_identifier(self, new_idents.deref(), schema);
self.process_wildcard_additional_options(expr?, options, excluded_cols)
}

fn process_wildcard_additional_options(
&mut self,
expr: Expr,
exprs: Vec<Expr>,
options: &WildcardAdditionalOptions,
contains_wildcard_exclude: &mut bool,
) -> PolarsResult<Expr> {
excluded_cols: &mut Vec<String>,
) -> PolarsResult<Vec<Expr>> {
// bail on unsupported wildcard options
if options.opt_ilike.is_some() {
polars_bail!(SQLSyntax: "ILIKE wildcard option is unsupported")
Expand All @@ -1220,17 +1186,15 @@ impl SQLContext {
polars_bail!(SQLSyntax: "EXCEPT wildcard option is unsupported (use EXCLUDE instead)")
}

Ok(match &options.opt_exclude {
Some(ExcludeSelectItem::Single(ident)) => {
*contains_wildcard_exclude = true;
expr.exclude(vec![&ident.value])
},
Some(ExcludeSelectItem::Multiple(idents)) => {
*contains_wildcard_exclude = true;
expr.exclude(idents.iter().map(|i| &i.value))
},
_ => expr,
})
if let Some(exc_items) = &options.opt_exclude {
*excluded_cols = match exc_items {
ExcludeSelectItem::Single(ident) => vec![ident.value.clone()],
ExcludeSelectItem::Multiple(idents) => {
idents.iter().map(|i| i.value.clone()).collect()
},
};
}
Ok(exprs)
}

fn rename_columns_from_table_alias(
Expand Down
Loading

0 comments on commit dd6b2c4

Please sign in to comment.