-
Notifications
You must be signed in to change notification settings - Fork 434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GLUTEN-3486][CH] Fix AQE cannot coalesce shuffle partitions #3941
Conversation
Run Gluten Clickhouse CI |
LGTM |
import org.apache.spark.sql.catalyst.expressions.Expression | ||
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning | ||
|
||
class HashPartitioningWrapper( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add some description for the functionality of this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
@@ -20,6 +20,7 @@ import io.glutenproject.GlutenConfig | |||
import io.glutenproject.backendsapi.BackendsApiManager | |||
import io.glutenproject.execution._ | |||
import io.glutenproject.extension.{GlutenPlan, ValidationResult} | |||
import io.glutenproject.sql.shims.SparkShimLoader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drop this change?
(projectExpressions.size, HashPartitioning(newExprs, numPartitions), project) | ||
( | ||
projectExpressions.size, | ||
new HashPartitioningWrapper(exprs, newExprs, numPartitions), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I get it. AQE won't touch the expressions in hash partitioning, then how can it affect AQE behavior ? Does it also affect range partitioning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/apache/spark/blob/b9cc813adf0ba592f020bc9778b373800ad81b8f/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L180C30-L180C30 here ValidateRequirements.validate(applied, distribution)
will validate failed due to shuffle hash expressions changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, and these codes only be used in ch backend so Velox backend does not have this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks reasonable to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, and these codes only be used in ch backend so Velox backend does not have this issue.
yes
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me! Thanks!
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
What changes were proposed in this pull request?
When there are non-field expressions in shuffle hash expression, we add a pre-project before shuffle exchange operator and change shuffle hash expression to field expression
projected_partitioning_value
which prevents AQE'sCoalesceShufflePartitions
andOptimizeSkewedJoin
rules from being effective. This pr introduces aHashPartitioningWrapper
in order to remain original shuffle hash expressions.(Fixes: #3486)
How was this patch tested?
manual tests