Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE] Expand project when subplan is aggregation #3440

Open
loneylee opened this issue Oct 19, 2023 · 5 comments
Open

[CORE] Expand project when subplan is aggregation #3440

loneylee opened this issue Oct 19, 2023 · 5 comments
Labels
bug Something isn't working triage

Comments

@loneylee
Copy link
Member

loneylee commented Oct 19, 2023

Backend

CH (ClickHouse)

Bug description

In spark rule org.apache.spark.sql.catalyst.optimizer.CollapseProject, project and aggregate will be collapsed.

plan.transformUpWithPruning(_.containsPattern(PROJECT), ruleId) {
  // ...
  case p @ Project(_, agg: Aggregate)
      if canCollapseExpressions(p.projectList, agg.aggregateExpressions, alwaysInline) &&
         canCollapseAggregate(p, agg) =>
    agg.copy(aggregateExpressions = buildCleanedProjectList(
      p.projectList, agg.aggregateExpressions))
   // ...
}

In this example, ObjectHashAggregate will be fallback ,because split expresstion currently is not support.

# optimize plan
Aggregate [l_orderkey#978L], [split(cast(l_orderkey#978L as string), ,, 1) AS split(a, ,, 1)#996, size(collect_set(l_shipdate#988, 0, 0), true) AS b#977]
+- Project [l_orderkey#978L, l_shipdate#988]
   +- Relation default.lineitem[l_orderkey#978L,l_partkey#979L,l_suppkey#980L,l_linenumber#981L...

# gluten plan
ObjectHashAggregate(keys=[l_orderkey#978L], functions=[collect_set(l_shipdate#988, 0, 0)], output=[split(a, ,, 1)#996, b#977])
+- CHNativeColumnarToRow
   +- ColumnarExchange hashpartitioning(l_orderkey#978L, 5), ENSURE_REQUIREMENTS, [plan_id=550], [id=#550], [OUTPUT] ArrayBuffer(l_orderkey:LongType, buf:BinaryType), [OUTPUT] ArrayBuffer(l_orderkey:LongType, buf:BinaryType)
      +- HashAggregateTransformer(keys=[l_orderkey#978L], functions=[partial_collect_set(l_shipdate#988, 0, 0)], output=[l_orderkey#978L, buf#1000])
         +- NativeFileNativeScan parquet default.lineitem[l_orderkey#978L,l_shipdate#988] Batched: true, DataFilters: [], Format: Parquet, Location ...

It's a bad fallback.Because partial aggregate plan not fallback, In ch backend partial agg has own struct data. The struct of data not support columnToRow.

In gluten code, final aggregate plan tranform to substrait plan will apply function applyPostProjection. And generate tow relnode project and aggRel to backend.

Can we expand HashAggregate to Project and HashAggregate before AddTransformHintRule. It will generate a plan like

# before expand
ObjectHashAggregate(keys=[l_orderkey#978L], functions=[collect_set(l_shipdate#988, 0, 0)], output=[split(a, ,, 1)#996, b#977])
+- Exchange hashpartitioning(l_orderkey#978L, 5), ENSURE_REQUIREMENTS, [plan_id=542]
   +- ObjectHashAggregate(keys=[l_orderkey#978L], functions=[partial_collect_set(l_shipdate#988, 0, 0)], output=[l_orderkey#978L, buf#1000])
      +- FileScan parquet default.lineitem[l_orderkey#978L,l_shipdate#988] Batched: true, DataFilters: [], Format: Parquet, Location: ...

# after expand
Project(output=[split(l_orderkey#978L, ,, 1)#996, b#977])   # new plan
  ObjectHashAggregate(keys=[l_orderkey#978L], functions=[collect_set(l_shipdate#988, 0, 0)], output=[l_orderkey#978L, collect_set(l_shipdate#988, 0, 0)])
  +- Exchange hashpartitioning(l_orderkey#978L, 5), ENSURE_REQUIREMENTS, [plan_id=542]
     +- ObjectHashAggregate(keys=[l_orderkey#978L], functions=[partial_collect_set(l_shipdate#988, 0, 0)], output=[l_orderkey#978L, buf#1000])
        +- FileScan parquet default.lineitem[l_orderkey#978L,l_shipdate#988] Batched: true, DataFilters: [], Format: Parquet, Location: ...

If Project has not support expression, we can fallback Project only.

Spark version

None

Spark configurations

No response

System information

No response

Relevant logs

No response

@loneylee loneylee added bug Something isn't working triage labels Oct 19, 2023
@loneylee
Copy link
Member Author

@zhztheplayer @rui-mo @PHILO-HE
Can you have a suggest ?

@PHILO-HE
Copy link
Contributor

FYI, similar to #3391.

@PHILO-HE
Copy link
Contributor

Do you mean disable collapsing agg + project for any plan? Or disable it only when post project is not transformable. Thanks!

@loneylee
Copy link
Member Author

loneylee commented Oct 20, 2023

Do you mean disable collapsing agg + project for any plan? Or disable it only when post project is not transformable. Thanks!

Disable collapsing agg + project for any plan.

@PHILO-HE
Copy link
Contributor

I am not sure how much perf. can be improved by the collapse optimizer for vanilla spark. If we try to disable collapsing agg + project for any plan, it may be better to also consider the perf. impact on agg fallback + project fallback case. Could you have a test and perf. comparison for vanilla spark by turning on/off the below config? Thanks! cc @zhztheplayer

--conf spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.CollapseProject

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage
Projects
None yet
Development

No branches or pull requests

2 participants