Skip to content

Commit

Permalink
Optimize SQL explain steps
Browse files Browse the repository at this point in the history
  • Loading branch information
aiwenmo committed Oct 23, 2024
1 parent 474aed5 commit 99d0287
Show file tree
Hide file tree
Showing 19 changed files with 463 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.executor;

import org.dinky.data.exception.DinkyException;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.parser.CustomParserImpl;
import org.dinky.utils.JsonUtils;
Expand Down Expand Up @@ -162,29 +163,32 @@ public StreamGraph getStreamGraphFromModifyOperations(List<ModifyOperation> modi
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
List<Operation> operations = getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
throw new DinkyException("Unsupported SQL explain! explainSql() only accepts a single SQL.");
}
return explainOperation(operations);
}

Operation operation = operations.get(0);
public SqlExplainResult explainOperation(List<Operation> operations, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
if (operations.isEmpty()) {
throw new DinkyException("No statement is explained.");
}
record.setParseTrue(true);
record.setExplainTrue(true);

Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
record.setExplain(getPlanner().explain(operations, extraDetails));
record.setType("Modify DML");
} else if (operation instanceof ExplainOperation) {
record.setExplain(getPlanner().explain(operations, extraDetails));
record.setType("Explain DML");
} else if (operation instanceof QueryOperation) {
record.setExplain(getPlanner().explain(operations, extraDetails));
record.setType("Query DML");
} else {
record.setExplain(operation.asSummaryString());
record.setType("DDL");

// record.setExplain("DDL statement needn't comment。");
return record;
}

record.setExplain(getPlanner().explain(operations, extraDetails));
record.setExplainTrue(true);
return record;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;

import java.io.File;
import java.net.URL;
Expand Down Expand Up @@ -67,6 +68,8 @@ public interface CustomTableEnvironment

SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails);

SqlExplainResult explainOperation(List<Operation> operations, ExplainDetail... extraDetails);

StreamExecutionEnvironment getStreamExecutionEnvironment();

Planner getPlanner();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.data.model;

import org.dinky.assertion.Asserts;
import org.dinky.context.EngineContextHolder;
import org.dinky.data.constant.CommonConstant;
import org.dinky.data.constant.DirConstant;
Expand Down Expand Up @@ -387,7 +388,7 @@ public Map<String, List<Configuration<?>>> getAllConfiguration() {
}

public boolean isUseRestAPI() {
return useRestAPI.getValue();
return Asserts.isNull(useRestAPI.getValue()) ? useRestAPI.getDefaultValue() : useRestAPI.getValue();
}

public int getJobIdWait() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@

import java.time.LocalDateTime;

/**
* 解释结果
*
* @since 2021/6/7 22:06
*/
public class SqlExplainResult {

private Integer index;
Expand All @@ -36,6 +31,9 @@ public class SqlExplainResult {
private String error;
private boolean parseTrue;
private boolean explainTrue;

private boolean isSkipped;

private LocalDateTime explainTime;

public SqlExplainResult() {}
Expand Down Expand Up @@ -71,6 +69,7 @@ private SqlExplainResult(Builder builder) {
setParseTrue(builder.parseTrue);
setExplainTrue(builder.explainTrue);
setExplainTime(builder.explainTime);
setSkipped(builder.isSkipped);
}

public static SqlExplainResult success(String type, String sql, String explain) {
Expand All @@ -92,6 +91,7 @@ public static Builder newBuilder(SqlExplainResult copy) {
builder.parseTrue = copy.isParseTrue();
builder.explainTrue = copy.isExplainTrue();
builder.explainTime = copy.getExplainTime();
builder.isSkipped = copy.isSkipped();
return builder;
}

Expand Down Expand Up @@ -167,6 +167,14 @@ public void setExplainTime(LocalDateTime explainTime) {
this.explainTime = explainTime;
}

public boolean isSkipped() {
return isSkipped;
}

public void setSkipped(boolean skipped) {
isSkipped = skipped;
}

@Override
public String toString() {
return String.format(
Expand All @@ -184,6 +192,7 @@ public static final class Builder {
private String error;
private boolean parseTrue;
private boolean explainTrue;
private boolean isSkipped = false;
private LocalDateTime explainTime;

private Builder() {}
Expand Down Expand Up @@ -232,6 +241,11 @@ public Builder explainTrue(boolean val) {
return this;
}

public Builder isSkipped() {
isSkipped = true;
return this;
}

public Builder explainTime(LocalDateTime val) {
explainTime = val;
return this;
Expand Down
5 changes: 5 additions & 0 deletions dinky-core/src/main/java/org/dinky/executor/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;

import java.io.File;
import java.net.URL;
Expand Down Expand Up @@ -247,6 +248,10 @@ public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extr
return null;
}

public SqlExplainResult explainOperation(List<Operation> operations, ExplainDetail... extraDetails) {
return tableEnvironment.explainOperation(operations, extraDetails);
}

public ObjectNode getStreamGraph(List<String> statements) {
StreamGraph streamGraph = tableEnvironment.getStreamGraphFromInserts(statements);
return getStreamGraphJsonNode(streamGraph);
Expand Down
51 changes: 41 additions & 10 deletions dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.dinky.job.JobConfig;
import org.dinky.job.JobManager;
import org.dinky.job.JobParam;
import org.dinky.job.JobRunnerFactory;
import org.dinky.job.JobStatement;
import org.dinky.job.JobStatementPlan;
import org.dinky.job.JobStatementType;
import org.dinky.job.StatementParam;
Expand Down Expand Up @@ -124,7 +126,7 @@ public JobStatementPlan parseStatements(String[] statements) {
udfStatements.add(sql);
}));
for (String udfStatement : udfStatements) {
jobStatementPlan.addJobStatementGenerated(udfStatement, JobStatementType.UDF);
jobStatementPlan.addJobStatementGenerated(udfStatement, JobStatementType.UDF, SqlType.CREATE);
}

for (String item : statements) {
Expand All @@ -134,26 +136,26 @@ public JobStatementPlan parseStatements(String[] statements) {
}
SqlType operationType = Operations.getOperationType(statement);
if (operationType.equals(SqlType.SET) && SetSqlParseStrategy.INSTANCE.match(statement)) {
jobStatementPlan.addJobStatement(statement, JobStatementType.SET);
jobStatementPlan.addJobStatement(statement, JobStatementType.SET, operationType);
} else if (operationType.equals(SqlType.ADD)) {
jobStatementPlan.addJobStatement(statement, JobStatementType.ADD);
jobStatementPlan.addJobStatement(statement, JobStatementType.ADD, operationType);
} else if (operationType.equals(SqlType.ADD_FILE)) {
jobStatementPlan.addJobStatement(statement, JobStatementType.ADD_FILE);
jobStatementPlan.addJobStatement(statement, JobStatementType.ADD_FILE, operationType);
} else if (operationType.equals(SqlType.ADD_JAR)) {
jobStatementPlan.addJobStatement(statement, JobStatementType.ADD_JAR);
jobStatementPlan.addJobStatement(statement, JobStatementType.ADD_JAR, operationType);
} else if (SqlType.getTransSqlTypes().contains(operationType)) {
jobStatementPlan.addJobStatement(statement, JobStatementType.SQL);
jobStatementPlan.addJobStatement(statement, JobStatementType.SQL, operationType);
if (!useStatementSet) {
break;
}
} else if (operationType.equals(SqlType.EXECUTE)) {
jobStatementPlan.addJobStatement(statement, JobStatementType.EXECUTE);
jobStatementPlan.addJobStatement(statement, JobStatementType.EXECUTE, operationType);
} else if (operationType.equals(SqlType.PRINT)) {
jobStatementPlan.addJobStatement(statement, JobStatementType.PRINT);
jobStatementPlan.addJobStatement(statement, JobStatementType.PRINT, operationType);
} else if (UDFUtil.isUdfStatement(statement)) {
jobStatementPlan.addJobStatement(statement, JobStatementType.UDF);
jobStatementPlan.addJobStatement(statement, JobStatementType.UDF, operationType);
} else {
jobStatementPlan.addJobStatement(statement, JobStatementType.DDL);
jobStatementPlan.addJobStatement(statement, JobStatementType.DDL, operationType);
}
}
return jobStatementPlan;
Expand Down Expand Up @@ -257,6 +259,35 @@ public List<UDF> parseUDFFromStatements(String[] statements) {
}

public ExplainResult explainSql(String statement) {
log.info("Start explain FlinkSQL...");
JobStatementPlan jobStatementPlan;
List<SqlExplainResult> sqlExplainRecords = new ArrayList<>();
boolean correct = true;
try {
jobStatementPlan = parseStatements(SqlUtil.getStatements(statement));
jobStatementPlan.buildFinalExecutableStatement();
} catch (Exception e) {
SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder();
resultBuilder.error(e.getMessage()).parseTrue(false);
sqlExplainRecords.add(resultBuilder.build());
log.error("Failed parseStatements:", e);
return new ExplainResult(false, sqlExplainRecords.size(), sqlExplainRecords);
}
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(jobManager);
for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) {
SqlExplainResult sqlExplainResult = jobRunnerFactory
.getJobRunner(jobStatement.getStatementType())
.explain(jobStatement);
if (!sqlExplainResult.isSkipped()) {
sqlExplainRecords.add(sqlExplainResult);
}
}

log.info(StrUtil.format("A total of {} FlinkSQL have been Explained.", sqlExplainRecords.size()));
return new ExplainResult(correct, sqlExplainRecords.size(), sqlExplainRecords);
}

public ExplainResult explainSql2(String statement) {
log.info("Start explain FlinkSQL...");
JobParam jobParam;
List<SqlExplainResult> sqlExplainRecords = new ArrayList<>();
Expand Down
64 changes: 64 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/AbstractJobRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.job;

import org.dinky.data.result.SqlExplainResult;
import org.dinky.utils.LogUtil;
import org.dinky.utils.SqlUtil;

import java.time.LocalDateTime;

import cn.hutool.core.text.StrFormatter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class AbstractJobRunner implements JobRunner {

protected JobManager jobManager;

public SqlExplainResult explain(JobStatement jobStatement) {
SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder();
try {
run(jobStatement);
resultBuilder
.parseTrue(true)
.explainTrue(true)
.type(jobStatement.getSqlType().getType())
.sql(jobStatement.getStatement())
.index(jobStatement.getIndex());
} catch (Exception e) {
String error = StrFormatter.format(
"Exception in explaining FlinkSQL:\n{}\n{}",
SqlUtil.addLineNumber(jobStatement.getStatement()),
LogUtil.getError(e));
resultBuilder
.parseTrue(false)
.error(error)
.explainTrue(false)
.type(jobStatement.getSqlType().getType())
.sql(jobStatement.getStatement())
.index(jobStatement.getIndex());
log.error(error);
} finally {
resultBuilder.explainTime(LocalDateTime.now());
return resultBuilder.build();
}
}
}
12 changes: 7 additions & 5 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ public JobResult executeJarSql(String statement) throws Exception {
jobJarRunner.run(jobStatement);
}

JobJarStreamGraphBuilder.build(this).run();
if (job.isFailed()) {
failed();
} else {
Expand Down Expand Up @@ -295,10 +294,9 @@ public JobResult executeSql(String statement) throws Exception {
Explainer.build(executor, useStatementSet, this).parseStatements(SqlUtil.getStatements(statement));
try {
jobStatementPlan.buildFinalExecutableStatement();

JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this);
for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) {
JobRunnerFactory.getJobRunner(jobStatement.getStatementType(), this)
.run(jobStatement);
jobRunnerFactory.getJobRunner(jobStatement.getStatementType()).run(jobStatement);
}

job.setEndTime(LocalDateTime.now());
Expand Down Expand Up @@ -369,10 +367,14 @@ public static SelectResult getJobData(String jobId) {
}

public ExplainResult explainSql(String statement) {
return Explainer.build(executor, useStatementSet, this).explainSql(statement);
}

/*public ExplainResult explainSql(String statement) {
return Explainer.build(executor, useStatementSet, this)
.initialize(config, statement)
.explainSql(statement);
}
}*/

public ObjectNode getStreamGraph(String statement) {
return Explainer.build(executor, useStatementSet, this)
Expand Down
4 changes: 4 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/JobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@

package org.dinky.job;

import org.dinky.data.result.SqlExplainResult;

public interface JobRunner {

void run(JobStatement jobStatement) throws Exception;

SqlExplainResult explain(JobStatement jobStatement);
}
Loading

0 comments on commit 99d0287

Please sign in to comment.