Skip to content

Commit

Permalink
modify code
Browse files Browse the repository at this point in the history
  • Loading branch information
starocean999 authored and morrySnow committed Sep 11, 2024
1 parent 8be2b8a commit cb0ff16
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,18 @@
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TMemoryScratchSink;
Expand Down Expand Up @@ -171,132 +171,148 @@ public Object query_plan(
private void handleQuery(ConnectContext context, String requestDb, String requestTable, String sql,
Map<String, Object> result) throws DorisHttpException {
List<StatementBase> stmts = null;
SessionVariable sessionVariable = context.getSessionVariable();
boolean needSetParallelResultSinkToFalse = false;
try {
stmts = new NereidsParser().parseSQL(sql, context.getSessionVariable());
} catch (Exception e) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST, e.getMessage());
}
// the parsed logical statement
StatementBase query = stmts.get(0);
if (!(query instanceof LogicalPlanAdapter)) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"Select statement needed, but found [" + sql + " ]");
}
LogicalPlan parsedPlan = ((LogicalPlanAdapter) query).getLogicalPlan();
// only process select semantic
if (parsedPlan instanceof Command) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"Select statement needed, but found [" + sql + " ]");
}
try {
if (!sessionVariable.enableParallelResultSink()) {
sessionVariable.setParallelResultSink(true);
needSetParallelResultSinkToFalse = true;
}
stmts = new NereidsParser().parseSQL(sql, context.getSessionVariable());
} catch (Exception e) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST, e.getMessage());
}
// the parsed logical statement
StatementBase query = stmts.get(0);
if (!(query instanceof LogicalPlanAdapter)) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"Select statement needed, but found [" + sql + " ]");
}
LogicalPlan parsedPlan = ((LogicalPlanAdapter) query).getLogicalPlan();
// only process select semantic
if (parsedPlan instanceof Command) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"Select statement needed, but found [" + sql + " ]");
}

if (!parsedPlan.collectToList(LogicalSubQueryAlias.class::isInstance).isEmpty()) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"Select statement must not embed another statement");
}
if (!parsedPlan.collectToList(LogicalSubQueryAlias.class::isInstance).isEmpty()) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"Select statement must not embed another statement");
}

List<UnboundRelation> unboundRelations = parsedPlan.collectToList(UnboundRelation.class::isInstance);
if (unboundRelations.size() != 1) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"Select statement must have only one table");
}
List<UnboundRelation> unboundRelations = parsedPlan.collectToList(UnboundRelation.class::isInstance);
if (unboundRelations.size() != 1) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"Select statement must have only one table");
}

// check consistent http requested resource with sql referenced
// if consistent in this way, can avoid check privilege
List<String> tableQualifier = RelationUtil.getQualifierName(context,
unboundRelations.get(0).getNameParts());
if (tableQualifier.size() != 3) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"can't find table " + String.join(",", tableQualifier));
}
String dbName = tableQualifier.get(1);
String tableName = tableQualifier.get(2);
// check consistent http requested resource with sql referenced
// if consistent in this way, can avoid check privilege
List<String> tableQualifier = RelationUtil.getQualifierName(context,
unboundRelations.get(0).getNameParts());
if (tableQualifier.size() != 3) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"can't find table " + String.join(",", tableQualifier));
}
String dbName = tableQualifier.get(1);
String tableName = tableQualifier.get(2);

if (GlobalVariable.lowerCaseTableNames == 0) {
if (!(dbName.equals(requestDb) && tableName.equals(requestTable))) {
if (GlobalVariable.lowerCaseTableNames == 0) {
if (!(dbName.equals(requestDb) && tableName.equals(requestTable))) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"requested database and table must consistent with sql: request [ "
+ requestDb + "." + requestTable + "]" + "and sql [" + dbName
+ "." + tableName + "]");
}
} else {
if (!(dbName.equalsIgnoreCase(requestDb)
&& tableName.equalsIgnoreCase(requestTable))) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"requested database and table must consistent with sql: request [ "
+ requestDb + "." + requestTable + "]" + "and sql [" + dbName
+ "." + tableName + "]");
}
}
NereidsPlanner nereidsPlanner = new NereidsPlanner(context.getStatementContext());
LogicalPlan rewrittenPlan = (LogicalPlan) nereidsPlanner.planWithLock(parsedPlan,
PhysicalProperties.GATHER, ExplainCommand.ExplainLevel.REWRITTEN_PLAN);
if (!rewrittenPlan.allMatch(planTreeNode -> planTreeNode instanceof LogicalOlapScan
|| planTreeNode instanceof LogicalFilter || planTreeNode instanceof LogicalProject
|| planTreeNode instanceof LogicalResultSink)) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"requested database and table must consistent with sql: request [ "
+ requestDb + "." + requestTable + "]" + "and sql [" + dbName + "." + tableName + "]");
"only support single table filter-prune-scan, but found [ " + sql + "]");
}
} else {
if (!(dbName.equalsIgnoreCase(requestDb)
&& tableName.equalsIgnoreCase(requestTable))) {
NereidsPlanner planner = new NereidsPlanner(context.getStatementContext());
try {
planner.plan(query, context.getSessionVariable().toThrift());
} catch (Exception ex) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"requested database and table must consistent with sql: request [ "
+ requestDb + "." + requestTable + "]" + "and sql [" + dbName + "." + tableName + "]");
"only support single table filter-prune-scan, but found [ " + sql + "]");
}
}
NereidsPlanner nereidsPlanner = new NereidsPlanner(context.getStatementContext());
LogicalPlan rewrittenPlan = (LogicalPlan) nereidsPlanner.planWithLock(parsedPlan,
PhysicalProperties.GATHER, ExplainCommand.ExplainLevel.REWRITTEN_PLAN);
if (rewrittenPlan.anyMatch(planTreeNode -> planTreeNode instanceof LogicalAggregate
|| planTreeNode instanceof LogicalWindow || planTreeNode instanceof LogicalSort
|| planTreeNode instanceof LogicalTopN || planTreeNode instanceof LogicalLimit)) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"only support single table filter-prune-scan, but found [ " + sql + "]");
}
NereidsPlanner planner = new NereidsPlanner(context.getStatementContext());
try {
planner.plan(query, context.getSessionVariable().toThrift());
} catch (Exception ex) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"only support single table filter-prune-scan, but found [ " + sql + "]");
}

// acquire ScanNode to obtain pruned tablet
// in this way, just retrieve only one scannode
List<ScanNode> scanNodes = planner.getScanNodes();
if (scanNodes.size() != 1) {
throw new DorisHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR,
"Planner should plan just only one ScanNode but found [ " + scanNodes.size() + "]");
}
List<TScanRangeLocations> scanRangeLocations = scanNodes.get(0).getScanRangeLocations(0);
// acquire the PlanFragment which the executable template
List<PlanFragment> fragments = planner.getFragments();
if (fragments.size() != 1) {
throw new DorisHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR,
"Planner should plan just only one PlanFragment but found [ " + fragments.size() + "]");
}
// acquire ScanNode to obtain pruned tablet
// in this way, just retrieve only one scannode
List<ScanNode> scanNodes = planner.getScanNodes();
if (scanNodes.size() != 1) {
throw new DorisHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR,
"Planner should plan just only one ScanNode but found [ " + scanNodes.size() + "]");
}
List<TScanRangeLocations> scanRangeLocations = scanNodes.get(0).getScanRangeLocations(0);
// acquire the PlanFragment which the executable template
List<PlanFragment> fragments = planner.getFragments();
if (fragments.size() != 1) {
throw new DorisHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR,
"Planner should plan just only one PlanFragment but found [ " + fragments.size() + "]");
}

TQueryPlanInfo tQueryPlanInfo = new TQueryPlanInfo();
TQueryPlanInfo tQueryPlanInfo = new TQueryPlanInfo();


// acquire TPlanFragment
TPlanFragment tPlanFragment = fragments.get(0).toThrift();
// set up TMemoryScratchSink
TDataSink tDataSink = new TDataSink();
tDataSink.type = TDataSinkType.MEMORY_SCRATCH_SINK;
tDataSink.memory_scratch_sink = new TMemoryScratchSink();
tPlanFragment.output_sink = tDataSink;
// acquire TPlanFragment
TPlanFragment tPlanFragment = fragments.get(0).toThrift();
// set up TMemoryScratchSink
TDataSink tDataSink = new TDataSink();
tDataSink.type = TDataSinkType.MEMORY_SCRATCH_SINK;
tDataSink.memory_scratch_sink = new TMemoryScratchSink();
tPlanFragment.output_sink = tDataSink;

tQueryPlanInfo.plan_fragment = tPlanFragment;
tQueryPlanInfo.desc_tbl = planner.getDescTable().toThrift();
// set query_id
UUID uuid = UUID.randomUUID();
tQueryPlanInfo.query_id = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
tQueryPlanInfo.plan_fragment = tPlanFragment;
tQueryPlanInfo.desc_tbl = planner.getDescTable().toThrift();
// set query_id
UUID uuid = UUID.randomUUID();
tQueryPlanInfo.query_id = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());

Map<Long, TTabletVersionInfo> tabletInfo = new HashMap<>();
// acquire resolved tablet distribution
Map<String, Node> tabletRoutings = assemblePrunedPartitions(scanRangeLocations);
tabletRoutings.forEach((tabletId, node) -> {
long tablet = Long.parseLong(tabletId);
tabletInfo.put(tablet, new TTabletVersionInfo(tablet, node.version, 0L /*version hash*/, node.schemaHash));
});
tQueryPlanInfo.tablet_info = tabletInfo;
Map<Long, TTabletVersionInfo> tabletInfo = new HashMap<>();
// acquire resolved tablet distribution
Map<String, Node> tabletRoutings = assemblePrunedPartitions(scanRangeLocations);
tabletRoutings.forEach((tabletId, node) -> {
long tablet = Long.parseLong(tabletId);
tabletInfo.put(tablet, new TTabletVersionInfo(tablet, node.version,
0L /*version hash*/, node.schemaHash));
});
tQueryPlanInfo.tablet_info = tabletInfo;

// serialize TQueryPlanInfo and encode plan with Base64 to string in order to translate by json format
TSerializer serializer;
String opaquedQueryPlan;
try {
serializer = new TSerializer();
byte[] queryPlanStream = serializer.serialize(tQueryPlanInfo);
opaquedQueryPlan = Base64.getEncoder().encodeToString(queryPlanStream);
} catch (TException e) {
throw new DorisHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR,
"TSerializer failed to serialize PlanFragment, reason [ " + e.getMessage() + " ]");
// serialize TQueryPlanInfo and encode plan with Base64 to string in order to translate by json format
TSerializer serializer;
String opaquedQueryPlan;
try {
serializer = new TSerializer();
byte[] queryPlanStream = serializer.serialize(tQueryPlanInfo);
opaquedQueryPlan = Base64.getEncoder().encodeToString(queryPlanStream);
} catch (TException e) {
throw new DorisHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR,
"TSerializer failed to serialize PlanFragment, reason [ " + e.getMessage() + " ]");
}
result.put("partitions", tabletRoutings);
result.put("opaqued_query_plan", opaquedQueryPlan);
result.put("status", 200);
} finally {
if (needSetParallelResultSinkToFalse) {
sessionVariable.setParallelResultSink(false);
}
}
result.put("partitions", tabletRoutings);
result.put("opaqued_query_plan", opaquedQueryPlan);
result.put("status", 200);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,23 @@ default boolean anyMatch(Predicate<TreeNode<NODE_TYPE>> predicate) {
return false;
}

/**
* iterate top down and test predicate if all matched. Top-down traverse implicitly.
* @param predicate predicate
* @return true if all predicate return true
*/
default boolean allMatch(Predicate<TreeNode<NODE_TYPE>> predicate) {
if (!predicate.test(this)) {
return false;
}
for (NODE_TYPE child : children()) {
if (!child.allMatch(predicate)) {
return false;
}
}
return true;
}

/**
* Collect the nodes that satisfied the predicate.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,22 @@ public void testNotOlapTableFailure() throws IOException {
String exception = (String) jsonObject.get("data");
Assert.assertTrue(exception.contains("table type is not OLAP"));
}

@Test
public void testHasAggFailure() throws IOException {
RequestBody body = RequestBody.create(
"{ \"sql\" : \" select k1,k2 from " + DB_NAME + "." + TABLE_NAME + " group by k1, k2 \" }", JSON);
Request request = new Request.Builder()
.post(body)
.addHeader("Authorization", rootAuth)
.url(URI + PATH_URI)
.build();
Response response = networkClient.newCall(request).execute();
Assert.assertNotNull(response.body());
String respStr = response.body().string();
Assert.assertNotNull(respStr);
JSONObject jsonObject = (JSONObject) JSONValue.parse(respStr);
String exception = jsonObject.get("data").toString();
Assert.assertTrue(exception.contains("only support single table filter-prune-scan"));
}
}

0 comments on commit cb0ff16

Please sign in to comment.