Skip to content

Commit

Permalink
Fix the duplicated key exception in TopN (#3655)
Browse files Browse the repository at this point in the history
The TakeOrderedAndProjectExec will be transformed into either a sort + limit or directly into a limit operation. If it is a sort + limit case, Velox will convert it into a TopNNode.  This PR do some validations on the TopNNode.
  • Loading branch information
JkSelf authored Nov 10, 2023
1 parent 33003d4 commit 3ecf596
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 30 deletions.
52 changes: 28 additions & 24 deletions cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,45 +347,49 @@ bool SubstraitToVeloxPlanValidator::validateExpression(
}

bool SubstraitToVeloxPlanValidator::validate(const ::substrait::FetchRel& fetchRel) {
const auto& extension = fetchRel.advanced_extension();
std::vector<TypePtr> types;
if (!validateInputTypes(extension, types)) {
logValidateMsg("native validation failed due to: unsupported input types in FetchRel.");
return false;
RowTypePtr rowType = nullptr;
// Get and validate the input types from extension.
if (fetchRel.has_advanced_extension()) {
const auto& extension = fetchRel.advanced_extension();
std::vector<TypePtr> types;
if (!validateInputTypes(extension, types)) {
logValidateMsg("native validation failed due to: unsupported input types in ExpandRel.");
return false;
}

int32_t inputPlanNodeId = 0;
std::vector<std::string> names;
names.reserve(types.size());
for (auto colIdx = 0; colIdx < types.size(); colIdx++) {
names.emplace_back(SubstraitParser::makeNodeName(inputPlanNodeId, colIdx));
}
rowType = std::make_shared<RowType>(std::move(names), std::move(types));
}

if (fetchRel.offset() < 0 || fetchRel.count() < 0) {
logValidateMsg("native validation failed due to: Offset and count should be valid in FetchRel.");
return false;
}

core::PlanNodePtr childNode;
// Check the input of fetchRel, if it's sortRel, we need to check whether the sorting key is duplicated.
::substrait::SortRel sortRel;
bool topNFlag = false;
if (fetchRel.has_input()) {
topNFlag = fetchRel.input().has_sort();
if (topNFlag) {
sortRel = fetchRel.input().sort();
childNode = planConverter_.toVeloxPlan(sortRel.input());
} else {
childNode = planConverter_.toVeloxPlan(fetchRel.input());
}
}

if (topNFlag) {
auto [sortingKeys, sortingOrders] = planConverter_.processSortField(sortRel.sorts(), childNode->outputType());

folly::F14FastSet<std::string> sortingKeyNames;
for (const auto& sortingKey : sortingKeys) {
auto result = sortingKeyNames.insert(sortingKey->name());
if (!result.second) {
logValidateMsg(
"native validation failed due to: if the input of fetchRel is a SortRel, we will convert it to a TopNNode. In Velox, it is important to ensure unique sorting keys. However, duplicate keys were found in this case.");
return false;
::substrait::SortRel sortRel = fetchRel.input().sort();
auto [sortingKeys, sortingOrders] = planConverter_.processSortField(sortRel.sorts(), rowType);
folly::F14FastSet<std::string> sortingKeyNames;
for (const auto& sortingKey : sortingKeys) {
auto result = sortingKeyNames.insert(sortingKey->name());
if (!result.second) {
logValidateMsg(
"native validation failed due to: if the input of fetchRel is a SortRel, we will convert it to a TopNNode. In Velox, it is important to ensure unique sorting keys. However, duplicate keys were found in this case.");
return false;
}
}
}
}

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ case class LimitTransformer(child: SparkPlan, offset: Long, count: Long)
override protected def doValidateInternal(): ValidationResult = {
val context = new SubstraitContext
val operatorId = context.nextOperatorId(this.nodeName)
val relNode = getRelNode(context, operatorId, offset, count, child.output, null, true)
val input = child match {
case c: TransformSupport => c.doTransform(context).root
case _ => null
}
val relNode = getRelNode(context, operatorId, offset, count, child.output, input, true)

doNativeValidation(context, relNode)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import io.glutenproject.utils.PhysicalPlanSelector
import org.apache.spark.api.python.EvalPythonExecTransformer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SortOrder}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.FullOuter
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -677,12 +677,17 @@ case class AddTransformHintRule() extends Rule[SparkPlan] {
"columnar topK is not enabled in TakeOrderedAndProjectExec")
} else {
var tagged: ValidationResult = null
val limitPlan = LimitTransformer(plan.child, 0, plan.limit)
tagged = limitPlan.doValidate()
if (tagged.isValid) {
val orderingSatisfies =
SortOrder.orderingSatisfies(plan.child.outputOrdering, plan.sortOrder)
if (orderingSatisfies) {
val limitPlan = LimitTransformer(plan.child, 0, plan.limit)
tagged = limitPlan.doValidate()
} else {
val sortPlan = SortExecTransformer(plan.sortOrder, false, plan.child)
tagged = sortPlan.doValidate()
val limitPlan = LimitTransformer(sortPlan, 0, plan.limit)
tagged = limitPlan.doValidate()
}

if (tagged.isValid) {
val projectPlan = ProjectExecTransformer(plan.projectList, plan.child)
tagged = projectPlan.doValidate()
Expand Down

0 comments on commit 3ecf596

Please sign in to comment.