Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Fix RoundRobinPartitioner by setting start partition id #3842

Merged
merged 10 commits into from
Nov 27, 2023

Conversation

zhli1142015
Copy link
Contributor

@zhli1142015 zhli1142015 commented Nov 24, 2023

What changes were proposed in this pull request?

spark.range(100).withColumn("pCol", 'id % 2).repartition(10).write.format("parquet").mode("overwrite").save("/tmp/veloxtest7")

This query should generate 10 files under directory /tmp/veloxtest7, but in my local shell, only 7 files are generated (16 cores in my machine, so there are 16 partitions in before stage and average 6 rows each partition, this resulted that data are partitioned to the first 7 partitions only).
image

RoundRobinPartitioner always start with 0 as selected partition id for all partitions, this is not expected. Make this change to align with spark behavior: to set start partition id as XorShift(partitionId) % numPartitions.
https://github.com/apache/spark/blob/ef27b9b15687dad416b6353409b1b44bc1451885/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L322

How was this patch tested?

UT.

Copy link

Thanks for opening a pull request!

Could you open an issue for this pull request on Github Issues?

https://github.com/oap-project/gluten/issues

Then could you also rename commit message and pull request title in the following format?

[GLUTEN-${ISSUES_ID}][COMPONENT]feat/fix: ${detailed message}

See also:

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

@zhli1142015 zhli1142015 changed the title [WIP][VL] fix RoundRobinPartitioner by setting seed pre partition [VL] fix RoundRobinPartitioner by setting start partition id Nov 27, 2023
@zhli1142015
Copy link
Contributor Author

@PHILO-HE and @JkSelf , could you please review this PR?

@zhli1142015 zhli1142015 changed the title [VL] fix RoundRobinPartitioner by setting start partition id [VL] Fix RoundRobinPartitioner by setting start partition id Nov 27, 2023
Copy link
Contributor

@JkSelf JkSelf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@JkSelf
Copy link
Contributor

JkSelf commented Nov 27, 2023

cc @marin-ma

Comment on lines 76 to 81
val startPartitionId = dep.nativePartitioning.getShortName match {
case "rr" =>
new XORShiftRandom(context.partitionId())
.nextInt(dep.partitioner.numPartitions)
case _ => 0
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhli1142015 thank you for the fix. Can we move this code block into NativePartitioning ? It should also be reused for other shuffle service.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this to shuffleUtils, thanks.

switch (partitioning) {
case Partitioning::kHash:
return std::make_shared<HashPartitioner>(numPartitions);
case Partitioning::kRoundRobin:
return std::make_shared<RoundRobinPartitioner>(numPartitions);
return std::make_shared<RoundRobinPartitioner>(numPartitions, startPartitionId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much for your fix!
As just required by RoundRobinPartitioner, can we move the code of generating startPartitionId from scala side to native side, i.e., into RoundRobinPartitioner? It may be a cleaner way if feasible. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comments. Yes, I'm firstly also looking to implement this in RoundRobinPartitioner, but looks there is no native implement of XORShiftRandom. Please point me if there is.
Besides if we move this logic to native side we still need to add a parameter to pass partition id from JVM to native.
Now the native side implment is more like the HashPartitioner, and getPartitionKeyExtractor are executed in JVM.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. There is no native XORShiftRandom in Gluten cpp. Since it's just a simple computation of a starting value, we don't need to bother maintaining the same logic on native side. However, the current argument list does look cumbersome. To make it cleaner, we should find a better way to separate parameters for the native ShuffleWriter and Partitioner, perhaps in a subsequent patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarifications. The current change is OK to me.
BTW, I am wondering whether XORShiftRandom must be used or can use a replacement (I'm not suggesting to change it).

Copy link
Contributor Author

@zhli1142015 zhli1142015 Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This impacts the partition that rows are placed, I think it's better to use the same algorithm with spark. And it may be required if "SORT_BEFORE_REPARTITION" is true. But there looks another issue, as now we just ignore the setting, we may add local sort before round robin shuffle if SORT_BEFORE_REPARTITION == true, in future.
https://github.com/apache/spark/blob/e1a2255f99be88e776295f30f995b339c3e4b5af/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L3367

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link
Contributor

@zhouyuan zhouyuan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@marin-ma marin-ma merged commit e6dd56e into apache:main Nov 27, 2023
18 checks passed
@GlutenPerfBot
Copy link
Contributor

===== Performance report for TPCH SF2000 with Velox backend, for reference only ====

query log/native_3842_time.csv log/native_master_11_26_2023_f456a7e86_time.csv difference percentage
q1 34.39 34.38 -0.012 99.97%
q2 25.15 24.62 -0.525 97.91%
q3 38.01 37.74 -0.267 99.30%
q4 37.54 36.67 -0.873 97.68%
q5 72.25 68.82 -3.432 95.25%
q6 7.26 7.12 -0.145 98.01%
q7 87.46 82.01 -5.452 93.77%
q8 88.77 87.72 -1.049 98.82%
q9 126.65 120.79 -5.854 95.38%
q10 43.25 44.22 0.962 102.22%
q11 20.21 19.96 -0.252 98.75%
q12 27.59 28.56 0.970 103.52%
q13 46.11 46.61 0.497 101.08%
q14 17.56 14.35 -3.217 81.68%
q15 27.53 29.15 1.620 105.89%
q16 15.49 15.41 -0.078 99.49%
q17 104.03 100.98 -3.051 97.07%
q18 151.85 145.61 -6.249 95.89%
q19 14.03 12.88 -1.144 91.85%
q20 28.18 27.79 -0.387 98.63%
q21 226.56 223.56 -2.995 98.68%
q22 13.12 13.03 -0.082 99.37%
total 1253.00 1221.98 -31.014 97.52%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants