-
Notifications
You must be signed in to change notification settings - Fork 435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[VL] Fix RoundRobinPartitioner by setting start partition id #3842
Conversation
Thanks for opening a pull request! Could you open an issue for this pull request on Github Issues? https://github.com/oap-project/gluten/issues Then could you also rename commit message and pull request title in the following format?
See also: |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
cc @marin-ma |
val startPartitionId = dep.nativePartitioning.getShortName match { | ||
case "rr" => | ||
new XORShiftRandom(context.partitionId()) | ||
.nextInt(dep.partitioner.numPartitions) | ||
case _ => 0 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhli1142015 thank you for the fix. Can we move this code block into NativePartitioning ? It should also be reused for other shuffle service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this to shuffleUtils, thanks.
switch (partitioning) { | ||
case Partitioning::kHash: | ||
return std::make_shared<HashPartitioner>(numPartitions); | ||
case Partitioning::kRoundRobin: | ||
return std::make_shared<RoundRobinPartitioner>(numPartitions); | ||
return std::make_shared<RoundRobinPartitioner>(numPartitions, startPartitionId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks so much for your fix!
As just required by RoundRobinPartitioner
, can we move the code of generating startPartitionId
from scala side to native side, i.e., into RoundRobinPartitioner
? It may be a cleaner way if feasible. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your comments. Yes, I'm firstly also looking to implement this in RoundRobinPartitioner
, but looks there is no native implement of XORShiftRandom
. Please point me if there is.
Besides if we move this logic to native side we still need to add a parameter to pass partition id from JVM to native.
Now the native side implment is more like the HashPartitioner
, and getPartitionKeyExtractor
are executed in JVM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. There is no native XORShiftRandom
in Gluten cpp. Since it's just a simple computation of a starting value, we don't need to bother maintaining the same logic on native side. However, the current argument list does look cumbersome. To make it cleaner, we should find a better way to separate parameters for the native ShuffleWriter
and Partitioner
, perhaps in a subsequent patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clarifications. The current change is OK to me.
BTW, I am wondering whether XORShiftRandom
must be used or can use a replacement (I'm not suggesting to change it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This impacts the partition that rows are placed, I think it's better to use the same algorithm with spark. And it may be required if "SORT_BEFORE_REPARTITION" is true. But there looks another issue, as now we just ignore the setting, we may add local sort before round robin shuffle if SORT_BEFORE_REPARTITION == true
, in future.
https://github.com/apache/spark/blob/e1a2255f99be88e776295f30f995b339c3e4b5af/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L3367
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
What changes were proposed in this pull request?
spark.range(100).withColumn("pCol", 'id % 2).repartition(10).write.format("parquet").mode("overwrite").save("/tmp/veloxtest7")
This query should generate 10 files under directory /tmp/veloxtest7, but in my local shell, only 7 files are generated (16 cores in my machine, so there are 16 partitions in before stage and average 6 rows each partition, this resulted that data are partitioned to the first 7 partitions only).
RoundRobinPartitioner always start with 0 as selected partition id for all partitions, this is not expected. Make this change to align with spark behavior: to set start partition id as
XorShift(partitionId) % numPartitions
.https://github.com/apache/spark/blob/ef27b9b15687dad416b6353409b1b44bc1451885/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L322
How was this patch tested?
UT.