Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49919][SQL] Add special limits support for return content as JSON dataset #48407

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

LantaoJin
Copy link
Contributor

@LantaoJin LantaoJin commented Oct 10, 2024

What changes were proposed in this pull request?

CollectLimitExec is used when a logical Limit and/or Offset operation is the final operator. Comparing to GlobalLimitExec, it can avoid shuffle data to a single output partition.
But when the dataset is collected as a Dataset of JSON strings. The GlobalLimitExec and TakeOrderedAndProjectExec are not able to applied since the SpecialLimits strategy cannot work as expected.
Here is an example, following query is a simple select-limit query:

scala> spark.sql("select * from right_t limit 4").explain
== Physical Plan ==
CollectLimit 4
+- Scan hive spark_catalog.default.right_t [id#23, name#24], HiveTableRelation [`spark_catalog`.`default`.`right_t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#23, name#24], Partition Cols: []]

When we add toJSON method, the plan changed unexpected and introduced a shuffle.

scala> spark.sql("select * from right_t limit 4").toJSON.explain
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false, true) AS value#34]
   +- MapPartitions org.apache.spark.sql.Dataset$$Lambda/0x00000070021d8c58@5b17838a, obj#33: java.lang.String
      +- DeserializeToObject createexternalrow(staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, id#27, true, false, true), name#28.toString, StructField(id,IntegerType,true), StructField(name,StringType,true)), obj#32: org.apache.spark.sql.Row
         +- GlobalLimit 4, 0
            +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=42]
               +- LocalLimit 4
                  +- Scan hive spark_catalog.default.right_t [id#27, name#28], HiveTableRelation [`spark_catalog`.`default`.`right_t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#27, name#28], 

Why are the changes needed?

Without this patching, the simple query "select limit" or "select sort limit" has to introduce a shuffle when return content as JSON dataset. Both CollectLimitExec and TakeOrderedAndProject cannot be applied.
Dataset.toJSON as a fundamental API causes to poor performance in many scenarios.

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT added.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Oct 10, 2024
@LantaoJin
Copy link
Contributor Author

ping @wangyum @cloud-fan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant