Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust,python,cli): add SQL support for UNION [ALL] BY NAME, add "diagonal_relaxed" strategy for pl.concat #11597

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/polars-core/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ where
/// Concat [`DataFrame`]s horizontally.
#[cfg(feature = "horizontal_concat")]
/// Concat horizontally and extend with null values if lengths don't match
pub fn hor_concat_df(dfs: &[DataFrame]) -> PolarsResult<DataFrame> {
pub fn concat_df_horizontal(dfs: &[DataFrame]) -> PolarsResult<DataFrame> {
let max_len = dfs
.iter()
.map(|df| df.height())
Expand Down Expand Up @@ -98,7 +98,7 @@ pub fn hor_concat_df(dfs: &[DataFrame]) -> PolarsResult<DataFrame> {
/// Concat [`DataFrame`]s diagonally.
#[cfg(feature = "diagonal_concat")]
/// Concat diagonally thereby combining different schemas.
pub fn diag_concat_df(dfs: &[DataFrame]) -> PolarsResult<DataFrame> {
pub fn concat_df_diagonal(dfs: &[DataFrame]) -> PolarsResult<DataFrame> {
// TODO! replace with lazy only?
let upper_bound_width = dfs.iter().map(|df| df.width()).sum();
let mut column_names = AHashSet::with_capacity(upper_bound_width);
Expand Down Expand Up @@ -175,7 +175,7 @@ mod test {
"d" => [1, 2]
]?;

let out = diag_concat_df(&[a, b, c])?;
let out = concat_df_diagonal(&[a, b, c])?;

let expected = df![
"a" => [Some(1), Some(2), None, None, Some(5), Some(7)],
Expand Down
37 changes: 18 additions & 19 deletions crates/polars-lazy/src/dsl/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,11 @@ pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
#[cfg(feature = "diagonal_concat")]
/// Concat [LazyFrame]s diagonally.
/// Calls [`concat`][concat()] internally.
pub fn diag_concat_lf<L: AsRef<[LazyFrame]>>(
lfs: L,
rechunk: bool,
parallel: bool,
pub fn concat_lf_diagonal<L: AsRef<[LazyFrame]>>(
inputs: L,
args: UnionArgs,
) -> PolarsResult<LazyFrame> {
let lfs = lfs.as_ref().to_vec();
let lfs = inputs.as_ref();
let schemas = lfs
.iter()
.map(|lf| lf.schema())
Expand All @@ -143,12 +142,12 @@ pub fn diag_concat_lf<L: AsRef<[LazyFrame]>>(
}
});
}

let lfs_with_all_columns = lfs
.into_iter()
.iter()
// Zip Frames with their Schemas
.zip(schemas)
.map(|(mut lf, lf_schema)| {
.map(|(lf, lf_schema)| {
let mut lf = lf.clone();
for (name, dtype) in total_schema.iter() {
// If a name from Total Schema is not present - append
if lf_schema.get_field(name).is_none() {
Expand All @@ -163,19 +162,11 @@ pub fn diag_concat_lf<L: AsRef<[LazyFrame]>>(
.map(|col_name| col(col_name))
.collect::<Vec<Expr>>(),
);

Ok(reordered_lf)
})
.collect::<PolarsResult<Vec<_>>>()?;

concat(
lfs_with_all_columns,
UnionArgs {
rechunk,
parallel,
to_supertypes: false,
},
)
concat(lfs_with_all_columns, args)
}

#[derive(Clone, Copy)]
Expand All @@ -195,7 +186,7 @@ impl Default for UnionArgs {
}
}

/// Concat multiple
/// Concat multiple [`LazyFrame`]s vertically.
pub fn concat<L: AsRef<[LazyFrame]>>(inputs: L, args: UnionArgs) -> PolarsResult<LazyFrame> {
concat_impl(
inputs,
Expand Down Expand Up @@ -241,7 +232,15 @@ mod test {
"d" => [1, 2]
]?;

let out = diag_concat_lf(&[a.lazy(), b.lazy(), c.lazy()], false, false)?.collect()?;
let out = concat_lf_diagonal(
&[a.lazy(), b.lazy(), c.lazy()],
UnionArgs {
rechunk: false,
parallel: false,
..Default::default()
},
)?
.collect()?;

let expected = df![
"a" => [Some(1), Some(2), None, None, Some(5), Some(7)],
Expand Down
1 change: 1 addition & 0 deletions crates/polars-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ default = []
ipc = ["polars-lazy/ipc"]
parquet = ["polars-lazy/parquet"]
semi_anti_join = ["polars-lazy/semi_anti_join"]
diagonal_concat = ["polars-lazy/diagonal_concat"]
28 changes: 19 additions & 9 deletions crates/polars-sql/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,31 @@ impl SQLContext {
) -> PolarsResult<LazyFrame> {
let left = self.process_set_expr(left, query)?;
let right = self.process_set_expr(right, query)?;
let concatenated = polars_lazy::dsl::concat(
vec![left, right],
UnionArgs {
parallel: true,
..Default::default()
},
);
let opts = UnionArgs {
parallel: true,
to_supertypes: true,
..Default::default()
};
match quantifier {
// UNION ALL
SetQuantifier::All => concatenated,
SetQuantifier::All => polars_lazy::dsl::concat(vec![left, right], opts),
// UNION [DISTINCT]
SetQuantifier::Distinct | SetQuantifier::None => {
let concatenated = polars_lazy::dsl::concat(vec![left, right], opts);
concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
},
// UNION ALL BY NAME
// TODO: add recognition for SetQuantifier::DistinctByName
// when "https://github.com/sqlparser-rs/sqlparser-rs/pull/997" is available
#[cfg(feature = "diagonal_concat")]
SetQuantifier::AllByName => concat_lf_diagonal(vec![left, right], opts),
// UNION [DISTINCT] BY NAME
#[cfg(feature = "diagonal_concat")]
SetQuantifier::ByName => {
let concatenated = concat_lf_diagonal(vec![left, right], opts);
concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
},
// TODO: support "UNION [ALL] BY NAME"
#[allow(unreachable_patterns)]
_ => polars_bail!(InvalidOperation: "UNION {} is not yet supported", quantifier),
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pct_change = ["polars-core/pct_change", "polars-lazy?/pct_change"]
moment = ["polars-core/moment", "polars-lazy?/moment", "polars-ops/moment"]
range = ["polars-lazy?/range"]
true_div = ["polars-lazy?/true_div"]
diagonal_concat = ["polars-core/diagonal_concat", "polars-lazy?/diagonal_concat"]
diagonal_concat = ["polars-core/diagonal_concat", "polars-lazy?/diagonal_concat", "polars-sql?/diagonal_concat"]
horizontal_concat = ["polars-core/horizontal_concat"]
abs = ["polars-core/abs", "polars-lazy?/abs"]
dynamic_group_by = ["polars-core/dynamic_group_by", "polars-lazy?/dynamic_group_by"]
Expand Down
4 changes: 2 additions & 2 deletions docs/src/rust/user-guide/transformations/concatenation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"r2"=> &[7, 8],
"r3"=> &[9, 10],
)?;
let df_horizontal_concat = polars::functions::hor_concat_df(&[df_h1, df_h2])?;
let df_horizontal_concat = polars::functions::concat_df_horizontal(&[df_h1, df_h2])?;
println!("{}", &df_horizontal_concat);
// --8<-- [end:horizontal]

Expand All @@ -42,7 +42,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let df_d2 = df!(
"a"=> &[2],
"d"=> &[4],)?;
let df_diagonal_concat = polars::functions::diag_concat_df(&[df_d1, df_d2])?;
let df_diagonal_concat = polars::functions::concat_df_diagonal(&[df_d1, df_d2])?;
println!("{}", &df_diagonal_concat);
// --8<-- [end:cross]
Ok(())
Expand Down
59 changes: 39 additions & 20 deletions py-polars/polars/functions/eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import contextlib
from functools import reduce
from itertools import chain
from typing import TYPE_CHECKING, Iterable, List, Sequence, cast
from typing import TYPE_CHECKING, Iterable, List, Sequence, cast, get_args

import polars._reexport as pl
from polars import functions as F
from polars.type_aliases import FrameType
from polars.type_aliases import ConcatMethod, FrameType
from polars.utils._wrap import wrap_df, wrap_expr, wrap_ldf, wrap_s
from polars.utils.various import ordered_unique

Expand All @@ -16,7 +16,7 @@

if TYPE_CHECKING:
from polars import DataFrame, Expr, LazyFrame, Series
from polars.type_aliases import ConcatMethod, JoinStrategy, PolarsType
from polars.type_aliases import JoinStrategy, PolarsType


def concat(
Expand All @@ -38,10 +38,12 @@ def concat(
LazyFrames do not support the `horizontal` strategy.

* vertical: Applies multiple `vstack` operations.
* vertical_relaxed: Applies multiple `vstack` operations and coerces column
dtypes that are not equal to their supertypes.
* vertical_relaxed: Same as `vertical`, but additionally coerces columns to
their common supertype *if* they are mismatched (eg: Int32 → Int64).
* diagonal: Finds a union between the column schemas and fills missing column
values with ``null``.
* diagonal_relaxed: Same as `diagonal`, but additionally coerces columns to
their common supertype *if* they are mismatched (eg: Int32 → Int64).
* horizontal: Stacks Series from DataFrames horizontally and fills with ``null``
if the lengths don't match.
* align: Combines frames horizontally, auto-determining the common key columns
Expand Down Expand Up @@ -175,41 +177,58 @@ def concat(
to_supertypes=True,
)
).collect(no_optimization=True)

elif how == "diagonal":
out = wrap_df(plr.diag_concat_df(elems))
out = wrap_df(plr.concat_df_diagonal(elems))
elif how == "diagonal_relaxed":
out = wrap_ldf(
plr.concat_lf_diagonal(
[df.lazy() for df in elems],
rechunk=rechunk,
parallel=parallel,
to_supertypes=True,
)
).collect(no_optimization=True)
elif how == "horizontal":
out = wrap_df(plr.hor_concat_df(elems))
out = wrap_df(plr.concat_df_horizontal(elems))
else:
allowed = ", ".join(repr(m) for m in get_args(ConcatMethod))
raise ValueError(
f"`how` must be one of {{'vertical','vertical_relaxed','diagonal','horizontal','align'}},"
f" got {how!r}"
f"DataFrame `how` must be one of {{{allowed}}}, got {how!r}"
)

elif isinstance(first, pl.LazyFrame):
if how == "vertical":
if how in ("vertical", "vertical_relaxed"):
return wrap_ldf(
plr.concat_lf(
elems, rechunk=rechunk, parallel=parallel, to_supertypes=False
elems,
rechunk=rechunk,
parallel=parallel,
to_supertypes=how.endswith("relaxed"),
)
)
if how == "vertical_relaxed":
elif how in ("diagonal", "diagonal_relaxed"):
return wrap_ldf(
plr.concat_lf(
elems, rechunk=rechunk, parallel=parallel, to_supertypes=True
plr.concat_lf_diagonal(
elems,
rechunk=rechunk,
parallel=parallel,
to_supertypes=how.endswith("relaxed"),
)
)
if how == "diagonal":
return wrap_ldf(
plr.diag_concat_lf(elems, rechunk=rechunk, parallel=parallel)
)
else:
allowed = ", ".join(
repr(m) for m in get_args(ConcatMethod) if m != "horizontal"
)
raise ValueError(
"'LazyFrame' only allows {'vertical','vertical_relaxed','diagonal','align'} concat strategies"
f"LazyFrame `how` must be one of {{{allowed}}}, got {how!r}"
)

elif isinstance(first, pl.Series):
if how == "vertical":
out = wrap_s(plr.concat_series(elems))
else:
raise ValueError("Series only allows {'vertical'} concat strategy")
raise ValueError("Series only supports 'vertical' concat strategy")

elif isinstance(first, pl.Expr):
return wrap_expr(plr.concat_expr([e._pyexpr for e in elems], rechunk))
Expand Down
7 changes: 6 additions & 1 deletion py-polars/polars/type_aliases.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,12 @@
# The following have no equivalent on the Rust side
Ambiguous: TypeAlias = Literal["earliest", "latest", "raise"]
ConcatMethod = Literal[
"vertical", "vertical_relaxed", "diagonal", "horizontal", "align"
"vertical",
"vertical_relaxed",
"diagonal",
"diagonal_relaxed",
"horizontal",
"align",
]
EpochTimeUnit = Literal["ns", "us", "ms", "s", "d"]
Orientation: TypeAlias = Literal["col", "row"]
Expand Down
8 changes: 4 additions & 4 deletions py-polars/src/functions/eager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub fn concat_series(series: &PyAny) -> PyResult<PySeries> {
}

#[pyfunction]
pub fn diag_concat_df(dfs: &PyAny) -> PyResult<PyDataFrame> {
pub fn concat_df_diagonal(dfs: &PyAny) -> PyResult<PyDataFrame> {
let iter = dfs.iter()?;

let dfs = iter
Expand All @@ -73,12 +73,12 @@ pub fn diag_concat_df(dfs: &PyAny) -> PyResult<PyDataFrame> {
})
.collect::<PyResult<Vec<_>>>()?;

let df = functions::diag_concat_df(&dfs).map_err(PyPolarsErr::from)?;
let df = functions::concat_df_diagonal(&dfs).map_err(PyPolarsErr::from)?;
Ok(df.into())
}

#[pyfunction]
pub fn hor_concat_df(dfs: &PyAny) -> PyResult<PyDataFrame> {
pub fn concat_df_horizontal(dfs: &PyAny) -> PyResult<PyDataFrame> {
let iter = dfs.iter()?;

let dfs = iter
Expand All @@ -88,6 +88,6 @@ pub fn hor_concat_df(dfs: &PyAny) -> PyResult<PyDataFrame> {
})
.collect::<PyResult<Vec<_>>>()?;

let df = functions::hor_concat_df(&dfs).map_err(PyPolarsErr::from)?;
let df = functions::concat_df_horizontal(&dfs).map_err(PyPolarsErr::from)?;
Ok(df.into())
}
17 changes: 15 additions & 2 deletions py-polars/src/functions/lazy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,12 @@ pub fn datetime(
}

#[pyfunction]
pub fn diag_concat_lf(lfs: &PyAny, rechunk: bool, parallel: bool) -> PyResult<PyLazyFrame> {
pub fn concat_lf_diagonal(
lfs: &PyAny,
rechunk: bool,
parallel: bool,
to_supertypes: bool,
) -> PyResult<PyLazyFrame> {
let iter = lfs.iter()?;

let lfs = iter
Expand All @@ -269,7 +274,15 @@ pub fn diag_concat_lf(lfs: &PyAny, rechunk: bool, parallel: bool) -> PyResult<Py
})
.collect::<PyResult<Vec<_>>>()?;

let lf = dsl::functions::diag_concat_lf(lfs, rechunk, parallel).map_err(PyPolarsErr::from)?;
let lf = dsl::functions::concat_lf_diagonal(
lfs,
UnionArgs {
rechunk,
parallel,
to_supertypes,
},
)
.map_err(PyPolarsErr::from)?;
Ok(lf.into())
}

Expand Down
8 changes: 4 additions & 4 deletions py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ fn polars(py: Python, m: &PyModule) -> PyResult<()> {
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::eager::concat_series))
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::eager::diag_concat_df))
m.add_wrapped(wrap_pyfunction!(functions::eager::concat_df_diagonal))
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::eager::hor_concat_df))
m.add_wrapped(wrap_pyfunction!(functions::eager::concat_df_horizontal))
.unwrap();

// Functions - range
Expand Down Expand Up @@ -161,10 +161,10 @@ fn polars(py: Python, m: &PyModule) -> PyResult<()> {
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::lazy::datetime))
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::lazy::diag_concat_lf))
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::lazy::concat_expr))
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::lazy::concat_lf_diagonal))
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::lazy::dtype_cols))
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::lazy::duration))
Expand Down
Loading