Skip to content

Commit

Permalink
horizontal
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 3, 2024
1 parent 66a3e82 commit 45f4e92
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 35 deletions.
16 changes: 1 addition & 15 deletions crates/polars-lazy/src/dsl/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,11 @@ pub fn concat_lf_horizontal<L: AsRef<[LazyFrame]>>(
opt_state.file_caching |= lf.opt_state.file_caching;
}

let mut lps = Vec::with_capacity(lfs.len());
let mut schemas = Vec::with_capacity(lfs.len());

for lf in lfs.iter() {
let mut lf = lf.clone();
let schema = lf.schema()?;
schemas.push(schema);
let lp = std::mem::take(&mut lf.logical_plan);
lps.push(lp);
}

let combined_schema = merge_schemas(&schemas)?;

let options = HConcatOptions {
parallel: args.parallel,
};
let lp = DslPlan::HConcat {
inputs: lps,
schema: Arc::new(combined_schema),
inputs: lfs.iter().map(|lf| lf.logical_plan.clone()).collect(),
options,
};
let mut lf = LazyFrame::from(lp);
Expand Down
21 changes: 17 additions & 4 deletions crates/polars-plan/src/logical_plan/conversion/convert_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,19 @@ pub(super) fn convert_st_union(
Ok(())
}

fn nodes_to_schemas(inputs: &[Node], lp_arena: &mut Arena<IR>) -> Vec<SchemaRef> {
inputs
.iter()
.map(|n| lp_arena.get(*n).schema(lp_arena).into_owned())
.collect()
}

pub(super) fn convert_diagonal_concat(
mut inputs: Vec<Node>,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> Vec<Node> {
let schemas = inputs
.iter()
.map(|n| lp_arena.get(*n).schema(lp_arena).into_owned())
.collect::<Vec<_>>();
let schemas = nodes_to_schemas(&inputs, lp_arena);

let upper_bound_width = schemas.iter().map(|sch| sch.len()).sum();

Expand Down Expand Up @@ -103,3 +107,12 @@ pub(super) fn convert_diagonal_concat(
inputs
}
}

pub(super) fn h_concat_schema(
inputs: &[Node],
lp_arena: &mut Arena<IR>,
) -> PolarsResult<SchemaRef> {
let schemas = nodes_to_schemas(inputs, lp_arena);
let combined_schema = merge_schemas(&schemas)?;
Ok(Arc::new(combined_schema))
}
11 changes: 5 additions & 6 deletions crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,15 @@ pub fn to_alp_impl(
let options = args.into();
IR::Union { inputs, options }
},
DslPlan::HConcat {
inputs,
schema,
options,
} => {
DslPlan::HConcat { inputs, options } => {
let inputs = inputs
.into_iter()
.map(|lp| to_alp_impl(lp, expr_arena, lp_arena, convert))
.collect::<PolarsResult<_>>()
.collect::<PolarsResult<Vec<_>>>()
.map_err(|e| e.context(failed_input!(horizontal concat)))?;

let schema = convert_utils::h_concat_schema(&inputs, lp_arena)?;

IR::HConcat {
inputs,
schema,
Expand Down
8 changes: 2 additions & 6 deletions crates/polars-plan/src/logical_plan/conversion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,14 @@ impl IR {
},
IR::HConcat {
inputs,
schema,
schema: _,
options,
} => {
let inputs = inputs
.into_iter()
.map(|node| convert_to_lp(node, lp_arena))
.collect();
DslPlan::HConcat {
inputs,
schema: schema.clone(),
options,
}
DslPlan::HConcat { inputs, options }
},
IR::Slice { input, offset, len } => {
let lp = convert_to_lp(input, lp_arena);
Expand Down
3 changes: 1 addition & 2 deletions crates/polars-plan/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ pub enum DslPlan {
/// Horizontal concatenation of multiple plans
HConcat {
inputs: Vec<DslPlan>,
schema: SchemaRef,
options: HConcatOptions,
},
/// This allows expressions to access other tables
Expand Down Expand Up @@ -198,7 +197,7 @@ impl Clone for DslPlan {
Self::Slice { input, offset, len } => Self::Slice { input: input.clone(), offset: offset.clone(), len: len.clone() },
Self::MapFunction { input, function } => Self::MapFunction { input: input.clone(), function: function.clone() },
Self::Union { inputs, args} => Self::Union { inputs: inputs.clone(), args: args.clone() },
Self::HConcat { inputs, schema, options } => Self::HConcat { inputs: inputs.clone(), schema: schema.clone(), options: options.clone() },
Self::HConcat { inputs, options } => Self::HConcat { inputs: inputs.clone(), options: options.clone() },
Self::ExtContext { input, contexts, } => Self::ExtContext { input: input.clone(), contexts: contexts.clone() },
Self::Sink { input, payload } => Self::Sink { input: input.clone(), payload: payload.clone() },
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ pub(super) fn process_hconcat(

let mut schemas = Vec::with_capacity(inputs.len());
for input in inputs.iter() {
let schema = lp_arena.get(*input).schema(lp_arena);
schemas.push(schema.as_ref().clone());
let schema = lp_arena.get(*input).schema(lp_arena).into_owned();
schemas.push(schema);
}
let new_schema = merge_schemas(&schemas)?;
Arc::new(new_schema)
Expand Down

0 comments on commit 45f4e92

Please sign in to comment.