Skip to content

Commit

Permalink
[BUG] Fix partition spec bugs from old query planner (#1372)
Browse files Browse the repository at this point in the history
Fixes partition specs referencing the wrong columns through Projections
that change columns. This is the general case of
#596.

1. Partition specs now check that the columns they reference still
exist.
2. If it does, it will make sure to reference the new name of the column
(preferring the same name if possible).
3. If any columns do not exist, the partitioning is destroyed (partition
spec set to Unknown).

---------

Co-authored-by: Xiayue Charles Lin <[email protected]>
  • Loading branch information
xcharleslin and Xiayue Charles Lin committed Sep 14, 2023
1 parent 0c6b8a9 commit cda7e4a
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 20 deletions.
280 changes: 277 additions & 3 deletions src/daft-plan/src/logical_ops/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use snafu::ResultExt;

use crate::logical_plan::{CreationSnafu, Result};
use crate::optimization::Transformed;
use crate::{LogicalPlan, ResourceRequest};
use crate::{LogicalPlan, PartitionScheme, PartitionSpec, ResourceRequest};

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Project {
Expand All @@ -17,6 +17,7 @@ pub struct Project {
pub projection: Vec<Expr>,
pub resource_request: ResourceRequest,
pub projected_schema: SchemaRef,
pub partition_spec: Arc<PartitionSpec>,
}

impl Project {
Expand All @@ -38,14 +39,182 @@ impl Project {
.context(CreationSnafu)?;
Schema::new(fields).context(CreationSnafu)?.into()
};
let partition_spec =
Self::translate_partition_spec(factored_input.partition_spec(), &factored_projection);
Ok(Self {
input: factored_input,
projection: factored_projection,
resource_request,
projected_schema,
partition_spec,
})
}

pub fn multiline_display(&self) -> Vec<String> {
vec![
format!(
"Project: {}",
self.projection
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ")
),
format!("Partition spec = {:?}", self.partition_spec),
]
}

fn translate_partition_spec(
input_pspec: Arc<PartitionSpec>,
projection: &Vec<Expr>,
) -> Arc<PartitionSpec> {
// Given an input partition spec, and a new projection,
// produce the new partition spec.

use crate::PartitionScheme::*;
match input_pspec.scheme {
// If the scheme is vacuous, the result partiiton spec is the same.
Random | Unknown => input_pspec.clone(),
// Otherwise, need to reevaluate the partition scheme for each expression.
Range | Hash => {
// See what columns the projection directly translates into new columns.
let mut old_colname_to_new_colname = IndexMap::new();
for expr in projection {
if let Some(oldname) = expr.input_mapping() {
let newname = expr.name().unwrap().to_string();
// Add the oldname -> newname mapping,
// but don't overwrite any existing identity mappings (e.g. "a" -> "a").
if old_colname_to_new_colname.get(&oldname) != Some(&oldname) {
old_colname_to_new_colname.insert(oldname, newname);
}
}
}

// Then, see if we can fully translate the partition spec.
let maybe_new_pspec = input_pspec
.by
.as_ref()
.unwrap()
.iter()
.map(|e| Self::translate_partition_spec_expr(e, &old_colname_to_new_colname))
.collect::<std::result::Result<Vec<_>, _>>();
maybe_new_pspec.map_or_else(
|()| {
PartitionSpec::new_internal(
PartitionScheme::Unknown,
input_pspec.num_partitions,
None,
)
.into()
},
|new_pspec: Vec<Expr>| {
PartitionSpec::new_internal(
input_pspec.scheme.clone(),
input_pspec.num_partitions,
Some(new_pspec),
)
.into()
},
)
}
}
}

fn translate_partition_spec_expr(
pspec_expr: &Expr,
old_colname_to_new_colname: &IndexMap<String, String>,
) -> std::result::Result<Expr, ()> {
// Given a single expression of an input partition spec,
// translate it to a new expression in the given projection.
// Returns:
// - Ok(expr) with expr being the translation, or
// - Err(()) if no translation is possible in the new projection.

match pspec_expr {
Expr::Column(name) => match old_colname_to_new_colname.get(name.as_ref()) {
Some(newname) => Ok(Expr::Column(newname.as_str().into())),
None => Err(()),
},
Expr::Literal(_) => Ok(pspec_expr.clone()),
Expr::Alias(child, name) => {
let newchild = Self::translate_partition_spec_expr(
child.as_ref(),
old_colname_to_new_colname,
)?;
Ok(Expr::Alias(newchild.into(), name.clone()))
}
Expr::BinaryOp { op, left, right } => {
let newleft =
Self::translate_partition_spec_expr(left.as_ref(), old_colname_to_new_colname)?;
let newright = Self::translate_partition_spec_expr(
right.as_ref(),
old_colname_to_new_colname,
)?;
Ok(Expr::BinaryOp {
op: *op,
left: newleft.into(),
right: newright.into(),
})
}
Expr::Cast(child, dtype) => {
let newchild = Self::translate_partition_spec_expr(
child.as_ref(),
old_colname_to_new_colname,
)?;
Ok(Expr::Cast(newchild.into(), dtype.clone()))
}
Expr::Function { func, inputs } => {
let new_inputs = inputs
.iter()
.map(|e| Self::translate_partition_spec_expr(e, old_colname_to_new_colname))
.collect::<Result<Vec<_>, _>>()?;
Ok(Expr::Function {
func: func.clone(),
inputs: new_inputs,
})
}
Expr::Not(child) => {
let newchild = Self::translate_partition_spec_expr(
child.as_ref(),
old_colname_to_new_colname,
)?;
Ok(Expr::Not(newchild.into()))
}
Expr::IsNull(child) => {
let newchild = Self::translate_partition_spec_expr(
child.as_ref(),
old_colname_to_new_colname,
)?;
Ok(Expr::IsNull(newchild.into()))
}
Expr::IfElse {
if_true,
if_false,
predicate,
} => {
let newtrue = Self::translate_partition_spec_expr(
if_true.as_ref(),
old_colname_to_new_colname,
)?;
let newfalse = Self::translate_partition_spec_expr(
if_false.as_ref(),
old_colname_to_new_colname,
)?;
let newpred = Self::translate_partition_spec_expr(
predicate.as_ref(),
old_colname_to_new_colname,
)?;
Ok(Expr::IfElse {
if_true: newtrue.into(),
if_false: newfalse.into(),
predicate: newpred.into(),
})
}
// Cannot have agg exprs in partition specs.
Expr::Agg(_) => Err(()),
}
}

fn try_factor_subexpressions(
input: Arc<LogicalPlan>,
projection: Vec<Expr>,
Expand Down Expand Up @@ -345,9 +514,11 @@ fn replace_column_with_semantic_id_aggexpr(
mod tests {
use common_error::DaftResult;
use daft_core::{datatypes::Field, DataType};
use daft_dsl::{binary_op, col, lit, Operator};
use daft_dsl::{binary_op, col, lit, Expr, Operator};

use crate::{logical_ops::Project, test::dummy_scan_node, LogicalPlan};
use crate::{
logical_ops::Project, test::dummy_scan_node, LogicalPlan, PartitionScheme, PartitionSpec,
};

/// Test that nested common subexpressions are correctly split
/// into multiple levels of projections.
Expand Down Expand Up @@ -456,4 +627,107 @@ mod tests {

Ok(())
}

/// Test that projections preserving column inputs, even through aliasing,
/// do not destroy the partition spec.
#[test]
fn test_partition_spec_preserving() -> DaftResult<()> {
let source = dummy_scan_node(vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Int64),
Field::new("c", DataType::Int64),
])
.repartition(
3,
vec![Expr::Column("a".into()), Expr::Column("b".into())],
PartitionScheme::Hash,
)?
.build();

let expressions = vec![
(col("a") % lit(2)), // this is now "a"
col("b"),
col("a").alias("aa"),
];

let result_projection = Project::try_new(source, expressions, Default::default())?;

let expected_pspec =
PartitionSpec::new_internal(PartitionScheme::Hash, 3, Some(vec![col("aa"), col("b")]));

assert_eq!(
expected_pspec,
result_projection.partition_spec.as_ref().clone()
);

Ok(())
}

/// Test that projections destroying even a single column input from the partition spec
/// destroys the entire partition spec.
#[test]
fn test_partition_spec_destroying() -> DaftResult<()> {
let source = dummy_scan_node(vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Int64),
Field::new("c", DataType::Int64),
])
.repartition(
3,
vec![Expr::Column("a".into()), Expr::Column("b".into())],
PartitionScheme::Hash,
)?
.build();

let expected_pspec = PartitionSpec::new_internal(PartitionScheme::Unknown, 3, None);

let test_cases = vec![
vec![col("a"), col("c").alias("b")], // original "b" is gone even though "b" is present
vec![col("b")], // original "a" dropped
vec![col("a") % lit(2), col("b")], // original "a" gone
vec![col("c")], // everything gone
];

for projection in test_cases {
let result_projection =
Project::try_new(source.clone(), projection, Default::default())?;
assert_eq!(
expected_pspec,
result_projection.partition_spec.as_ref().clone()
);
}

Ok(())
}

/// Test that new partition specs favor existing instead of new names.
/// i.e. ("a", "a" as "b") remains partitioned by "a", not "b"
#[test]
fn test_partition_spec_prefer_existing_names() -> DaftResult<()> {
let source = dummy_scan_node(vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Int64),
Field::new("c", DataType::Int64),
])
.repartition(
3,
vec![Expr::Column("a".into()), Expr::Column("b".into())],
PartitionScheme::Hash,
)?
.build();

let expressions = vec![col("a").alias("y"), col("a"), col("a").alias("z"), col("b")];

let result_projection = Project::try_new(source, expressions, Default::default())?;

let expected_pspec =
PartitionSpec::new_internal(PartitionScheme::Hash, 3, Some(vec![col("a"), col("b")]));

assert_eq!(
expected_pspec,
result_projection.partition_spec.as_ref().clone()
);

Ok(())
}
}
13 changes: 2 additions & 11 deletions src/daft-plan/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl LogicalPlan {
pub fn partition_spec(&self) -> Arc<PartitionSpec> {
match self {
Self::Source(Source { partition_spec, .. }) => partition_spec.clone(),
Self::Project(Project { input, .. }) => input.partition_spec(),
Self::Project(Project { partition_spec, .. }) => partition_spec.clone(),
Self::Filter(Filter { input, .. }) => input.partition_spec(),
Self::Limit(Limit { input, .. }) => input.partition_spec(),
Self::Explode(Explode { input, .. }) => input.partition_spec(),
Expand Down Expand Up @@ -283,16 +283,7 @@ impl LogicalPlan {
pub fn multiline_display(&self) -> Vec<String> {
match self {
Self::Source(source) => source.multiline_display(),
Self::Project(Project { projection, .. }) => {
vec![format!(
"Project: {}",
projection
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ")
)]
}
Self::Project(projection) => projection.multiline_display(),
Self::Filter(Filter { predicate, .. }) => vec![format!("Filter: {predicate}")],
Self::Limit(Limit { limit, .. }) => vec![format!("Limit: {limit}")],
Self::Explode(Explode { to_explode, .. }) => {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-plan/src/optimization/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ mod tests {
assert_eq!(pass_count, 6);
let expected = "\
Filter: [[[col(a) < lit(2)] | lit(false)] | lit(false)] & lit(true)\
\n Project: col(a) + lit(3) AS c, col(a) + lit(1), col(a) + lit(2) AS b\
\n Project: col(a) + lit(3) AS c, col(a) + lit(1), col(a) + lit(2) AS b, Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\
\n Source: Json, File paths = [/foo], File schema = a (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64)";
assert_eq!(opt_plan.repr_indent(), expected);
Ok(())
Expand Down
8 changes: 4 additions & 4 deletions src/daft-plan/src/optimization/rules/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ mod tests {
.filter(col("a").lt(&lit(2)))?
.build();
let expected = "\
Project: col(a)\
Project: col(a), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\
\n Filter: col(a) < lit(2)\
\n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)";
assert_optimized_plan_eq(plan, expected)?;
Expand All @@ -295,7 +295,7 @@ mod tests {
.filter(col("a").lt(&lit(2)).and(&col("b").eq(&lit("foo"))))?
.build();
let expected = "\
Project: col(a), col(b)\
Project: col(a), col(b), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\
\n Filter: [col(a) < lit(2)] & [col(b) == lit(\"foo\")]\
\n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)";
assert_optimized_plan_eq(plan, expected)?;
Expand All @@ -316,7 +316,7 @@ mod tests {
// Filter should NOT commute with Project, since this would involve redundant computation.
let expected = "\
Filter: col(a) < lit(2)\
\n Project: col(a) + lit(1)\
\n Project: col(a) + lit(1), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\
\n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)";
assert_optimized_plan_eq(plan, expected)?;
Ok(())
Expand All @@ -336,7 +336,7 @@ mod tests {
.filter(col("a").lt(&lit(2)))?
.build();
let expected = "\
Project: col(a) + lit(1)\
Project: col(a) + lit(1), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\
\n Filter: [col(a) + lit(1)] < lit(2)\
\n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)";
assert_optimized_plan_eq(plan, expected)?;
Expand Down
Loading

0 comments on commit cda7e4a

Please sign in to comment.