-
Notifications
You must be signed in to change notification settings - Fork 435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[VL] Support StreamingAggregate if child output ordering is satisfied #3828
Conversation
Thanks for opening a pull request! Could you open an issue for this pull request on Github Issues? https://github.com/oap-project/gluten/issues Then could you also rename commit message and pull request title in the following format?
See also: |
Run Gluten Clickhouse CI |
1f11c45
to
68dd54d
Compare
Run Gluten Clickhouse CI |
.saveAsTable("t") | ||
|
||
try { | ||
doBenchmark() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to me that we don't check query result in the benchmark code. So should we add some simple UT code with result check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I will add a test later
68dd54d
to
db8201f
Compare
Run Gluten Clickhouse CI |
cc @zhztheplayer @rui-mo @PHILO-HE it's ready to review, thank you |
/Benchmark Velox |
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
/Benchmark Velox |
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
agg.child.outputOrdering | ||
case _ => child.outputOrdering | ||
} | ||
val requiredOrdering = groupingExpressions.map(expr => SortOrder.apply(expr, Ascending)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why ascending by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It follows the default behavior of SMJ requiredChildOrdering. see https://github.com/apache/spark/blob/ef27b9b15687dad416b6353409b1b44bc1451885/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L81-L92
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does that mean it only fits with SMJ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC Ascending
is the Spark default ordering, all the bulit-in SQL operators follow it. e.g., SMJ, Window, SortAggregate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it by Spark's limitation that a plan node can't require "asc or desc" child ordering at the same time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it's a kind of hard code, but so far, I did not see any requirements about changing it. Will Velox operators require or introduce Descending ordering ?
@@ -976,7 +976,9 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::AggregateRel& ag | |||
if (aggRel.has_advanced_extension()) { | |||
std::vector<TypePtr> types; | |||
const auto& extension = aggRel.advanced_extension(); | |||
if (!validateInputTypes(extension, types)) { | |||
// Aggregate always has advanced extension for steaming aggregate optimization, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: steaming -> streaming
Run Gluten Clickhouse CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! It's already in good shape in my opinion.
val childOrdering = child match { | ||
case agg: HashAggregateExecBaseTransformer | ||
if agg.groupingExpressions == this.groupingExpressions => | ||
agg.child.outputOrdering |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why propagating child's child's ordering here? It would be great if we could also leave some comments in code. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added some comments, hope it is clear now
protected def isGroupingKeysPreGrouped: Boolean = { | ||
if (!conf.getConf(GlutenConfig.COLUMNAR_PREFER_STREAMING_AGGREGATE)) { | ||
return false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check doesn't do as the method name isGroupingKeysPreGrouped
said?
I would suggest renaming isGroupingKeysPreGrouped
to something like isCapableForStreamingAggregation
, or just moving this config check out.
Run Gluten Clickhouse CI |
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
What changes were proposed in this pull request?
If the child output ordering satisfies the grouping keys then the grouping keys are pregrouped, we can choose to use
StreamingAggregate
. Velox supportsStreamingAggregate
which requires the input is pregrouped, soStreamingAggregate
could use less memory to do aggregate as it does not need to hold all groups in memory and it could avoid spill.For example, the query can hit this optimization if the join is SMJ:
How was this patch tested?
manually benchmark