From 99d02878b5f2c2dcdcb0481f47940a05795979cc Mon Sep 17 00:00:00 2001 From: wenmo <32723967+aiwenmo@users.noreply.github.com> Date: Thu, 24 Oct 2024 01:17:02 +0800 Subject: [PATCH] Optimize SQL explain steps --- .../executor/CustomTableEnvironmentImpl.java | 22 +++--- .../executor/CustomTableEnvironment.java | 3 + .../dinky/data/model/SystemConfiguration.java | 3 +- .../dinky/data/result/SqlExplainResult.java | 24 +++++-- .../java/org/dinky/executor/Executor.java | 5 ++ .../java/org/dinky/explainer/Explainer.java | 51 +++++++++++--- .../java/org/dinky/job/AbstractJobRunner.java | 64 ++++++++++++++++++ .../main/java/org/dinky/job/JobManager.java | 12 ++-- .../main/java/org/dinky/job/JobRunner.java | 4 ++ .../java/org/dinky/job/JobRunnerFactory.java | 39 ++++++++--- .../java/org/dinky/job/JobStatementPlan.java | 10 +-- .../org/dinky/job/runner/JobAddRunner.java | 7 +- .../org/dinky/job/runner/JobDDLRunner.java | 48 ++++++++++++- .../dinky/job/runner/JobExecuteRunner.java | 58 +++++++++++++++- .../org/dinky/job/runner/JobJarRunner.java | 67 ++++++++++++++----- .../org/dinky/job/runner/JobPrintRunner.java | 51 +++++++++++++- .../org/dinky/job/runner/JobSetRunner.java | 7 +- .../org/dinky/job/runner/JobSqlRunner.java | 67 ++++++++++++++++--- .../org/dinky/job/runner/JobUDFRunner.java | 6 +- 19 files changed, 463 insertions(+), 85 deletions(-) create mode 100644 dinky-core/src/main/java/org/dinky/job/AbstractJobRunner.java diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java index 9ee50a616e..57f352c8b9 100644 --- a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java +++ b/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java @@ -19,6 +19,7 @@ package org.dinky.executor; +import org.dinky.data.exception.DinkyException; import org.dinky.data.result.SqlExplainResult; import org.dinky.parser.CustomParserImpl; import org.dinky.utils.JsonUtils; @@ -162,29 +163,32 @@ public StreamGraph getStreamGraphFromModifyOperations(List modi public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) { List operations = getParser().parse(statement); if (operations.size() != 1) { - throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query."); + throw new DinkyException("Unsupported SQL explain! explainSql() only accepts a single SQL."); } + return explainOperation(operations); + } - Operation operation = operations.get(0); + public SqlExplainResult explainOperation(List operations, ExplainDetail... extraDetails) { SqlExplainResult record = new SqlExplainResult(); + if (operations.isEmpty()) { + throw new DinkyException("No statement is explained."); + } record.setParseTrue(true); - record.setExplainTrue(true); - + Operation operation = operations.get(0); if (operation instanceof ModifyOperation) { + record.setExplain(getPlanner().explain(operations, extraDetails)); record.setType("Modify DML"); } else if (operation instanceof ExplainOperation) { + record.setExplain(getPlanner().explain(operations, extraDetails)); record.setType("Explain DML"); } else if (operation instanceof QueryOperation) { + record.setExplain(getPlanner().explain(operations, extraDetails)); record.setType("Query DML"); } else { record.setExplain(operation.asSummaryString()); record.setType("DDL"); - - // record.setExplain("DDL statement needn't comment。"); - return record; } - - record.setExplain(getPlanner().explain(operations, extraDetails)); + record.setExplainTrue(true); return record; } } diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java index 32f3d85d32..e67b4b385f 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java @@ -34,6 +34,7 @@ import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.operations.ModifyOperation; +import org.apache.flink.table.operations.Operation; import java.io.File; import java.net.URL; @@ -67,6 +68,8 @@ public interface CustomTableEnvironment SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails); + SqlExplainResult explainOperation(List operations, ExplainDetail... extraDetails); + StreamExecutionEnvironment getStreamExecutionEnvironment(); Planner getPlanner(); diff --git a/dinky-common/src/main/java/org/dinky/data/model/SystemConfiguration.java b/dinky-common/src/main/java/org/dinky/data/model/SystemConfiguration.java index a82b233e36..539ca363a0 100644 --- a/dinky-common/src/main/java/org/dinky/data/model/SystemConfiguration.java +++ b/dinky-common/src/main/java/org/dinky/data/model/SystemConfiguration.java @@ -19,6 +19,7 @@ package org.dinky.data.model; +import org.dinky.assertion.Asserts; import org.dinky.context.EngineContextHolder; import org.dinky.data.constant.CommonConstant; import org.dinky.data.constant.DirConstant; @@ -387,7 +388,7 @@ public Map>> getAllConfiguration() { } public boolean isUseRestAPI() { - return useRestAPI.getValue(); + return Asserts.isNull(useRestAPI.getValue()) ? useRestAPI.getDefaultValue() : useRestAPI.getValue(); } public int getJobIdWait() { diff --git a/dinky-common/src/main/java/org/dinky/data/result/SqlExplainResult.java b/dinky-common/src/main/java/org/dinky/data/result/SqlExplainResult.java index b5cb123121..9ef0db33b4 100644 --- a/dinky-common/src/main/java/org/dinky/data/result/SqlExplainResult.java +++ b/dinky-common/src/main/java/org/dinky/data/result/SqlExplainResult.java @@ -21,11 +21,6 @@ import java.time.LocalDateTime; -/** - * 解释结果 - * - * @since 2021/6/7 22:06 - */ public class SqlExplainResult { private Integer index; @@ -36,6 +31,9 @@ public class SqlExplainResult { private String error; private boolean parseTrue; private boolean explainTrue; + + private boolean isSkipped; + private LocalDateTime explainTime; public SqlExplainResult() {} @@ -71,6 +69,7 @@ private SqlExplainResult(Builder builder) { setParseTrue(builder.parseTrue); setExplainTrue(builder.explainTrue); setExplainTime(builder.explainTime); + setSkipped(builder.isSkipped); } public static SqlExplainResult success(String type, String sql, String explain) { @@ -92,6 +91,7 @@ public static Builder newBuilder(SqlExplainResult copy) { builder.parseTrue = copy.isParseTrue(); builder.explainTrue = copy.isExplainTrue(); builder.explainTime = copy.getExplainTime(); + builder.isSkipped = copy.isSkipped(); return builder; } @@ -167,6 +167,14 @@ public void setExplainTime(LocalDateTime explainTime) { this.explainTime = explainTime; } + public boolean isSkipped() { + return isSkipped; + } + + public void setSkipped(boolean skipped) { + isSkipped = skipped; + } + @Override public String toString() { return String.format( @@ -184,6 +192,7 @@ public static final class Builder { private String error; private boolean parseTrue; private boolean explainTrue; + private boolean isSkipped = false; private LocalDateTime explainTime; private Builder() {} @@ -232,6 +241,11 @@ public Builder explainTrue(boolean val) { return this; } + public Builder isSkipped() { + isSkipped = true; + return this; + } + public Builder explainTime(LocalDateTime val) { explainTime = val; return this; diff --git a/dinky-core/src/main/java/org/dinky/executor/Executor.java b/dinky-core/src/main/java/org/dinky/executor/Executor.java index 68113b4853..1fb1ec97bf 100644 --- a/dinky-core/src/main/java/org/dinky/executor/Executor.java +++ b/dinky-core/src/main/java/org/dinky/executor/Executor.java @@ -45,6 +45,7 @@ import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.operations.ModifyOperation; +import org.apache.flink.table.operations.Operation; import java.io.File; import java.net.URL; @@ -247,6 +248,10 @@ public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extr return null; } + public SqlExplainResult explainOperation(List operations, ExplainDetail... extraDetails) { + return tableEnvironment.explainOperation(operations, extraDetails); + } + public ObjectNode getStreamGraph(List statements) { StreamGraph streamGraph = tableEnvironment.getStreamGraphFromInserts(statements); return getStreamGraphJsonNode(streamGraph); diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index 048eedae01..69254a0bc8 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -34,6 +34,8 @@ import org.dinky.job.JobConfig; import org.dinky.job.JobManager; import org.dinky.job.JobParam; +import org.dinky.job.JobRunnerFactory; +import org.dinky.job.JobStatement; import org.dinky.job.JobStatementPlan; import org.dinky.job.JobStatementType; import org.dinky.job.StatementParam; @@ -124,7 +126,7 @@ public JobStatementPlan parseStatements(String[] statements) { udfStatements.add(sql); })); for (String udfStatement : udfStatements) { - jobStatementPlan.addJobStatementGenerated(udfStatement, JobStatementType.UDF); + jobStatementPlan.addJobStatementGenerated(udfStatement, JobStatementType.UDF, SqlType.CREATE); } for (String item : statements) { @@ -134,26 +136,26 @@ public JobStatementPlan parseStatements(String[] statements) { } SqlType operationType = Operations.getOperationType(statement); if (operationType.equals(SqlType.SET) && SetSqlParseStrategy.INSTANCE.match(statement)) { - jobStatementPlan.addJobStatement(statement, JobStatementType.SET); + jobStatementPlan.addJobStatement(statement, JobStatementType.SET, operationType); } else if (operationType.equals(SqlType.ADD)) { - jobStatementPlan.addJobStatement(statement, JobStatementType.ADD); + jobStatementPlan.addJobStatement(statement, JobStatementType.ADD, operationType); } else if (operationType.equals(SqlType.ADD_FILE)) { - jobStatementPlan.addJobStatement(statement, JobStatementType.ADD_FILE); + jobStatementPlan.addJobStatement(statement, JobStatementType.ADD_FILE, operationType); } else if (operationType.equals(SqlType.ADD_JAR)) { - jobStatementPlan.addJobStatement(statement, JobStatementType.ADD_JAR); + jobStatementPlan.addJobStatement(statement, JobStatementType.ADD_JAR, operationType); } else if (SqlType.getTransSqlTypes().contains(operationType)) { - jobStatementPlan.addJobStatement(statement, JobStatementType.SQL); + jobStatementPlan.addJobStatement(statement, JobStatementType.SQL, operationType); if (!useStatementSet) { break; } } else if (operationType.equals(SqlType.EXECUTE)) { - jobStatementPlan.addJobStatement(statement, JobStatementType.EXECUTE); + jobStatementPlan.addJobStatement(statement, JobStatementType.EXECUTE, operationType); } else if (operationType.equals(SqlType.PRINT)) { - jobStatementPlan.addJobStatement(statement, JobStatementType.PRINT); + jobStatementPlan.addJobStatement(statement, JobStatementType.PRINT, operationType); } else if (UDFUtil.isUdfStatement(statement)) { - jobStatementPlan.addJobStatement(statement, JobStatementType.UDF); + jobStatementPlan.addJobStatement(statement, JobStatementType.UDF, operationType); } else { - jobStatementPlan.addJobStatement(statement, JobStatementType.DDL); + jobStatementPlan.addJobStatement(statement, JobStatementType.DDL, operationType); } } return jobStatementPlan; @@ -257,6 +259,35 @@ public List parseUDFFromStatements(String[] statements) { } public ExplainResult explainSql(String statement) { + log.info("Start explain FlinkSQL..."); + JobStatementPlan jobStatementPlan; + List sqlExplainRecords = new ArrayList<>(); + boolean correct = true; + try { + jobStatementPlan = parseStatements(SqlUtil.getStatements(statement)); + jobStatementPlan.buildFinalExecutableStatement(); + } catch (Exception e) { + SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); + resultBuilder.error(e.getMessage()).parseTrue(false); + sqlExplainRecords.add(resultBuilder.build()); + log.error("Failed parseStatements:", e); + return new ExplainResult(false, sqlExplainRecords.size(), sqlExplainRecords); + } + JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(jobManager); + for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) { + SqlExplainResult sqlExplainResult = jobRunnerFactory + .getJobRunner(jobStatement.getStatementType()) + .explain(jobStatement); + if (!sqlExplainResult.isSkipped()) { + sqlExplainRecords.add(sqlExplainResult); + } + } + + log.info(StrUtil.format("A total of {} FlinkSQL have been Explained.", sqlExplainRecords.size())); + return new ExplainResult(correct, sqlExplainRecords.size(), sqlExplainRecords); + } + + public ExplainResult explainSql2(String statement) { log.info("Start explain FlinkSQL..."); JobParam jobParam; List sqlExplainRecords = new ArrayList<>(); diff --git a/dinky-core/src/main/java/org/dinky/job/AbstractJobRunner.java b/dinky-core/src/main/java/org/dinky/job/AbstractJobRunner.java new file mode 100644 index 0000000000..f175579e77 --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/job/AbstractJobRunner.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.job; + +import org.dinky.data.result.SqlExplainResult; +import org.dinky.utils.LogUtil; +import org.dinky.utils.SqlUtil; + +import java.time.LocalDateTime; + +import cn.hutool.core.text.StrFormatter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class AbstractJobRunner implements JobRunner { + + protected JobManager jobManager; + + public SqlExplainResult explain(JobStatement jobStatement) { + SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); + try { + run(jobStatement); + resultBuilder + .parseTrue(true) + .explainTrue(true) + .type(jobStatement.getSqlType().getType()) + .sql(jobStatement.getStatement()) + .index(jobStatement.getIndex()); + } catch (Exception e) { + String error = StrFormatter.format( + "Exception in explaining FlinkSQL:\n{}\n{}", + SqlUtil.addLineNumber(jobStatement.getStatement()), + LogUtil.getError(e)); + resultBuilder + .parseTrue(false) + .error(error) + .explainTrue(false) + .type(jobStatement.getSqlType().getType()) + .sql(jobStatement.getStatement()) + .index(jobStatement.getIndex()); + log.error(error); + } finally { + resultBuilder.explainTime(LocalDateTime.now()); + return resultBuilder.build(); + } + } +} diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index 28df4f2ad9..04750abbaf 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -264,7 +264,6 @@ public JobResult executeJarSql(String statement) throws Exception { jobJarRunner.run(jobStatement); } - JobJarStreamGraphBuilder.build(this).run(); if (job.isFailed()) { failed(); } else { @@ -295,10 +294,9 @@ public JobResult executeSql(String statement) throws Exception { Explainer.build(executor, useStatementSet, this).parseStatements(SqlUtil.getStatements(statement)); try { jobStatementPlan.buildFinalExecutableStatement(); - + JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this); for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) { - JobRunnerFactory.getJobRunner(jobStatement.getStatementType(), this) - .run(jobStatement); + jobRunnerFactory.getJobRunner(jobStatement.getStatementType()).run(jobStatement); } job.setEndTime(LocalDateTime.now()); @@ -369,10 +367,14 @@ public static SelectResult getJobData(String jobId) { } public ExplainResult explainSql(String statement) { + return Explainer.build(executor, useStatementSet, this).explainSql(statement); + } + + /*public ExplainResult explainSql(String statement) { return Explainer.build(executor, useStatementSet, this) .initialize(config, statement) .explainSql(statement); - } + }*/ public ObjectNode getStreamGraph(String statement) { return Explainer.build(executor, useStatementSet, this) diff --git a/dinky-core/src/main/java/org/dinky/job/JobRunner.java b/dinky-core/src/main/java/org/dinky/job/JobRunner.java index 109a5940c7..b682fad222 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/JobRunner.java @@ -19,7 +19,11 @@ package org.dinky.job; +import org.dinky.data.result.SqlExplainResult; + public interface JobRunner { void run(JobStatement jobStatement) throws Exception; + + SqlExplainResult explain(JobStatement jobStatement); } diff --git a/dinky-core/src/main/java/org/dinky/job/JobRunnerFactory.java b/dinky-core/src/main/java/org/dinky/job/JobRunnerFactory.java index aa1114bf67..fe550a287c 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobRunnerFactory.java +++ b/dinky-core/src/main/java/org/dinky/job/JobRunnerFactory.java @@ -28,25 +28,48 @@ import org.dinky.job.runner.JobUDFRunner; public class JobRunnerFactory { - public static JobRunner getJobRunner(JobStatementType jobStatementType, JobManager jobManager) { + + private JobSetRunner jobSetRunner; + private JobAddRunner jobAddRunner; + private JobSqlRunner jobSqlRunner; + private JobExecuteRunner jobExecuteRunner; + private JobUDFRunner jobUDFRunner; + private JobPrintRunner jobPrintRunner; + private JobDDLRunner jobDDLRunner; + + public JobRunnerFactory(JobManager jobManager) { + this.jobSetRunner = new JobSetRunner(jobManager); + this.jobAddRunner = new JobAddRunner(jobManager); + this.jobSqlRunner = new JobSqlRunner(jobManager); + this.jobExecuteRunner = new JobExecuteRunner(jobManager); + this.jobUDFRunner = new JobUDFRunner(jobManager); + this.jobPrintRunner = new JobPrintRunner(jobManager); + this.jobDDLRunner = new JobDDLRunner(jobManager); + } + + public JobRunner getJobRunner(JobStatementType jobStatementType) { switch (jobStatementType) { case SET: - return new JobSetRunner(jobManager); + return jobSetRunner; case ADD: case ADD_FILE: case ADD_JAR: - return new JobAddRunner(jobManager); + return jobAddRunner; case SQL: - return new JobSqlRunner(jobManager); + return jobSqlRunner; case EXECUTE: - return new JobExecuteRunner(jobManager); + return jobExecuteRunner; case UDF: - return new JobUDFRunner(jobManager); + return jobUDFRunner; case PRINT: - return new JobPrintRunner(jobManager); + return jobPrintRunner; case DDL: default: - return new JobDDLRunner(jobManager); + return jobDDLRunner; } } + + public static JobRunnerFactory create(JobManager jobManager) { + return new JobRunnerFactory(jobManager); + } } diff --git a/dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java b/dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java index 665a84ae81..dbb6e78df3 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java +++ b/dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java @@ -19,6 +19,8 @@ package org.dinky.job; +import org.dinky.parser.SqlType; + import java.util.ArrayList; import java.util.List; @@ -32,12 +34,12 @@ public List getJobStatementList() { return jobStatementList; } - public void addJobStatement(String statement, JobStatementType statementType) { - jobStatementList.add(new JobStatement(jobStatementList.size() + 1, statement, statementType, false)); + public void addJobStatement(String statement, JobStatementType statementType, SqlType sqlType) { + jobStatementList.add(new JobStatement(jobStatementList.size() + 1, statement, statementType, sqlType, false)); } - public void addJobStatementGenerated(String statement, JobStatementType statementType) { - jobStatementList.add(new JobStatement(jobStatementList.size() + 1, statement, statementType, true)); + public void addJobStatementGenerated(String statement, JobStatementType statementType, SqlType sqlType) { + jobStatementList.add(new JobStatement(jobStatementList.size() + 1, statement, statementType, sqlType, true)); } public void buildFinalExecutableStatement() { diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobAddRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobAddRunner.java index 66b2b46869..fc0e51bd72 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobAddRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobAddRunner.java @@ -20,8 +20,8 @@ package org.dinky.job.runner; import org.dinky.executor.CustomTableEnvironment; +import org.dinky.job.AbstractJobRunner; import org.dinky.job.JobManager; -import org.dinky.job.JobRunner; import org.dinky.job.JobStatement; import org.dinky.trans.parse.AddFileSqlParseStrategy; import org.dinky.trans.parse.AddJarSqlParseStrategy; @@ -30,9 +30,10 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; -public class JobAddRunner implements JobRunner { +import lombok.extern.slf4j.Slf4j; - private JobManager jobManager; +@Slf4j +public class JobAddRunner extends AbstractJobRunner { public JobAddRunner(JobManager jobManager) { this.jobManager = jobManager; diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobDDLRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobDDLRunner.java index dfd958d9a1..7893f5f1a6 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobDDLRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobDDLRunner.java @@ -19,13 +19,21 @@ package org.dinky.job.runner; +import org.dinky.assertion.Asserts; +import org.dinky.data.result.SqlExplainResult; +import org.dinky.job.AbstractJobRunner; import org.dinky.job.JobManager; -import org.dinky.job.JobRunner; import org.dinky.job.JobStatement; +import org.dinky.utils.LogUtil; +import org.dinky.utils.SqlUtil; -public class JobDDLRunner implements JobRunner { +import java.time.LocalDateTime; - private JobManager jobManager; +import cn.hutool.core.text.StrFormatter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class JobDDLRunner extends AbstractJobRunner { public JobDDLRunner(JobManager jobManager) { this.jobManager = jobManager; @@ -35,4 +43,38 @@ public JobDDLRunner(JobManager jobManager) { public void run(JobStatement jobStatement) throws Exception { jobManager.getExecutor().executeSql(jobStatement.getStatement()); } + + @Override + public SqlExplainResult explain(JobStatement jobStatement) { + SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); + try { + SqlExplainResult recordResult = jobManager.getExecutor().explainSqlRecord(jobStatement.getStatement()); + if (Asserts.isNull(recordResult)) { + return resultBuilder.isSkipped().build(); + } + resultBuilder = SqlExplainResult.newBuilder(recordResult); + // Flink DDL needs to execute to create catalog. + run(jobStatement); + resultBuilder + .explainTrue(true) + .type(jobStatement.getSqlType().getType()) + .sql(jobStatement.getStatement()) + .index(jobStatement.getIndex()); + } catch (Exception e) { + String error = StrFormatter.format( + "Exception in explaining FlinkSQL:\n{}\n{}", + SqlUtil.addLineNumber(jobStatement.getStatement()), + LogUtil.getError(e)); + resultBuilder + .error(error) + .explainTrue(false) + .type(jobStatement.getSqlType().getType()) + .sql(jobStatement.getStatement()) + .index(jobStatement.getIndex()); + log.error(error); + } finally { + resultBuilder.explainTime(LocalDateTime.now()); + return resultBuilder.build(); + } + } } diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobExecuteRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobExecuteRunner.java index 0ab3282dd2..1e462c978a 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobExecuteRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobExecuteRunner.java @@ -23,13 +23,17 @@ import org.dinky.data.result.IResult; import org.dinky.data.result.InsertResult; import org.dinky.data.result.ResultBuilder; +import org.dinky.data.result.SqlExplainResult; import org.dinky.gateway.Gateway; import org.dinky.gateway.result.GatewayResult; +import org.dinky.job.AbstractJobRunner; import org.dinky.job.Job; import org.dinky.job.JobManager; -import org.dinky.job.JobRunner; import org.dinky.job.JobStatement; import org.dinky.parser.SqlType; +import org.dinky.utils.FlinkStreamEnvironmentUtil; +import org.dinky.utils.LogUtil; +import org.dinky.utils.SqlUtil; import org.dinky.utils.URLUtils; import org.apache.flink.core.execution.JobClient; @@ -37,16 +41,21 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.streaming.api.graph.StreamGraph; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; -public class JobExecuteRunner implements JobRunner { +import cn.hutool.core.text.StrFormatter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class JobExecuteRunner extends AbstractJobRunner { - private JobManager jobManager; private List statements; public JobExecuteRunner(JobManager jobManager) { this.jobManager = jobManager; + this.statements = new ArrayList<>(); } @Override @@ -62,6 +71,49 @@ public void run(JobStatement jobStatement) throws Exception { } } + @Override + public SqlExplainResult explain(JobStatement jobStatement) { + SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); + + try { + statements.add(jobStatement.getStatement()); + jobManager.getExecutor().executeSql(jobStatement.getStatement()); + if (jobStatement.isFinalExecutableStatement()) { + resultBuilder + .explain(FlinkStreamEnvironmentUtil.getStreamingPlanAsJSON( + jobManager.getExecutor().getStreamGraph())) + .type(jobStatement.getSqlType().getType()) + .parseTrue(true) + .explainTrue(true) + .sql(jobStatement.getStatement()) + .index(jobStatement.getIndex()); + } else { + resultBuilder + .sql(getParsedSql()) + .type(jobStatement.getSqlType().getType()) + .index(jobStatement.getIndex()) + .parseTrue(true) + .explainTrue(true) + .isSkipped(); + } + } catch (Exception e) { + String error = StrFormatter.format( + "Exception in explaining FlinkSQL:\n{}\n{}", + SqlUtil.addLineNumber(jobStatement.getStatement()), + LogUtil.getError(e)); + resultBuilder + .error(error) + .explainTrue(false) + .type(jobStatement.getSqlType().getType()) + .sql(jobStatement.getStatement()) + .index(jobStatement.getIndex()); + log.error(error); + } finally { + resultBuilder.explainTime(LocalDateTime.now()); + return resultBuilder.build(); + } + } + private void processWithGateway() throws Exception { GatewayResult gatewayResult = null; jobManager.getConfig().addGatewayConfig(jobManager.getExecutor().getSetConfig()); diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java index 326436d559..4368313ba7 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java @@ -23,12 +23,13 @@ import org.dinky.classloader.DinkyClassLoader; import org.dinky.data.exception.DinkyException; import org.dinky.data.result.InsertResult; +import org.dinky.data.result.SqlExplainResult; import org.dinky.gateway.Gateway; import org.dinky.gateway.config.GatewayConfig; import org.dinky.gateway.result.GatewayResult; +import org.dinky.job.AbstractJobRunner; import org.dinky.job.Job; import org.dinky.job.JobManager; -import org.dinky.job.JobRunner; import org.dinky.job.JobStatement; import org.dinky.parser.SqlType; import org.dinky.trans.Operations; @@ -40,6 +41,7 @@ import org.dinky.trans.parse.SetSqlParseStrategy; import org.dinky.utils.DinkyClassLoaderUtil; import org.dinky.utils.FlinkStreamEnvironmentUtil; +import org.dinky.utils.LogUtil; import org.dinky.utils.SqlUtil; import org.dinky.utils.URLUtils; @@ -55,17 +57,18 @@ import java.io.File; import java.net.URL; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.Set; import cn.hutool.core.lang.Assert; +import cn.hutool.core.text.StrFormatter; import lombok.extern.slf4j.Slf4j; @Slf4j -public class JobJarRunner implements JobRunner { +public class JobJarRunner extends AbstractJobRunner { - private JobManager jobManager; private final Configuration configuration; public JobJarRunner(JobManager jobManager) { @@ -77,13 +80,13 @@ public JobJarRunner(JobManager jobManager) { @Override public void run(JobStatement jobStatement) throws Exception { if (!jobManager.isUseGateway()) { - submitNormal(); + submitNormal(jobStatement); } else { GatewayResult gatewayResult; if (jobManager.getRunMode().isApplicationMode()) { - gatewayResult = submitGateway(); + gatewayResult = submitGateway(jobStatement); } else { - gatewayResult = submitNormalWithGateway(); + gatewayResult = submitNormalWithGateway(jobStatement); } jobManager.getJob().setResult(InsertResult.success(gatewayResult.getId())); jobManager.getJob().setJobId(gatewayResult.getId()); @@ -100,15 +103,47 @@ public void run(JobStatement jobStatement) throws Exception { } } - private GatewayResult submitGateway() throws Exception { - configuration.set(PipelineOptions.JARS, getUris(jobManager.getJob().getStatement())); + @Override + public SqlExplainResult explain(JobStatement jobStatement) { + SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); + + try { + // Execute task does not support statement set. + Pipeline pipeline = getPipeline(jobStatement); + resultBuilder + .explain(FlinkStreamEnvironmentUtil.getStreamingPlanAsJSON(pipeline)) + .type(jobStatement.getSqlType().getType()) + .parseTrue(true) + .explainTrue(true) + .sql(jobStatement.getStatement()) + .index(jobStatement.getIndex()); + } catch (Exception e) { + String error = StrFormatter.format( + "Exception in explaining FlinkSQL:\n{}\n{}", + SqlUtil.addLineNumber(jobStatement.getStatement()), + LogUtil.getError(e)); + resultBuilder + .error(error) + .explainTrue(false) + .type(jobStatement.getSqlType().getType()) + .sql(jobStatement.getStatement()) + .index(jobStatement.getIndex()); + log.error(error); + } finally { + resultBuilder.explainTime(LocalDateTime.now()); + return resultBuilder.build(); + } + } + + private GatewayResult submitGateway(JobStatement jobStatement) throws Exception { + configuration.set(PipelineOptions.JARS, getUris(jobStatement.getStatement())); jobManager.getConfig().addGatewayConfig(configuration); - jobManager.getConfig().getGatewayConfig().setSql(jobManager.getJob().getStatement()); + jobManager.getConfig().getGatewayConfig().setSql(jobStatement.getStatement()); return Gateway.build(jobManager.getConfig().getGatewayConfig()).submitJar(jobManager.getUdfPathContextHolder()); } - private GatewayResult submitNormalWithGateway() { - Pipeline pipeline = getPipeline(); + private GatewayResult submitNormalWithGateway(JobStatement jobStatement) { + Pipeline pipeline = getPipeline(jobStatement); if (pipeline instanceof StreamGraph) { ((StreamGraph) pipeline).setJobName(jobManager.getConfig().getJobName()); } else if (pipeline instanceof Plan) { @@ -116,7 +151,7 @@ private GatewayResult submitNormalWithGateway() { } JobGraph jobGraph = FlinkStreamEnvironmentUtil.getJobGraph(pipeline, configuration); GatewayConfig gatewayConfig = jobManager.getConfig().getGatewayConfig(); - List uriList = getUris(jobManager.getJob().getStatement()); + List uriList = getUris(jobStatement.getStatement()); String[] jarPaths = uriList.stream() .map(URLUtils::toFile) .map(File::getAbsolutePath) @@ -125,8 +160,8 @@ private GatewayResult submitNormalWithGateway() { return Gateway.build(gatewayConfig).submitJobGraph(jobGraph); } - private Pipeline getPipeline() { - Pipeline pipeline = getJarStreamGraph(jobManager.getJob().getStatement(), jobManager.getDinkyClassLoader()); + private Pipeline getPipeline(JobStatement jobStatement) { + Pipeline pipeline = getJarStreamGraph(jobStatement.getStatement(), jobManager.getDinkyClassLoader()); if (pipeline instanceof StreamGraph) { if (Asserts.isNotNullString(jobManager.getConfig().getSavePointPath())) { ((StreamGraph) pipeline) @@ -138,9 +173,9 @@ private Pipeline getPipeline() { return pipeline; } - private void submitNormal() throws Exception { + private void submitNormal(JobStatement jobStatement) throws Exception { JobClient jobClient = FlinkStreamEnvironmentUtil.executeAsync( - getPipeline(), jobManager.getExecutor().getStreamExecutionEnvironment()); + getPipeline(jobStatement), jobManager.getExecutor().getStreamExecutionEnvironment()); if (Asserts.isNotNull(jobClient)) { jobManager.getJob().setJobId(jobClient.getJobID().toHexString()); jobManager.getJob().setJids(new ArrayList() { diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobPrintRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobPrintRunner.java index 71c1df7273..121fe4e873 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobPrintRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobPrintRunner.java @@ -19,19 +19,25 @@ package org.dinky.job.runner; +import org.dinky.data.result.SqlExplainResult; import org.dinky.explainer.print_table.PrintStatementExplainer; +import org.dinky.job.AbstractJobRunner; import org.dinky.job.JobManager; -import org.dinky.job.JobRunner; import org.dinky.job.JobStatement; import org.dinky.job.JobStatementType; import org.dinky.parser.SqlType; import org.dinky.utils.IpUtil; +import org.dinky.utils.LogUtil; +import org.dinky.utils.SqlUtil; +import java.time.LocalDateTime; import java.util.Map; -public class JobPrintRunner implements JobRunner { +import cn.hutool.core.text.StrFormatter; +import lombok.extern.slf4j.Slf4j; - private JobManager jobManager; +@Slf4j +public class JobPrintRunner extends AbstractJobRunner { public JobPrintRunner(JobManager jobManager) { this.jobManager = jobManager; @@ -52,4 +58,43 @@ public void run(JobStatement jobStatement) throws Exception { jobSqlRunner.run(ctasJobStatement); } } + + @Override + public SqlExplainResult explain(JobStatement jobStatement) { + SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); + try { + Map config = + jobManager.getExecutor().getExecutorConfig().getConfig(); + String host = config.getOrDefault("dinky.dinkyHost", IpUtil.getHostIp()); + int port = Integer.parseInt(config.getOrDefault("dinky.dinkyPrintPort", "7125")); + String[] tableNames = PrintStatementExplainer.getTableNames(jobStatement.getStatement()); + for (String tableName : tableNames) { + String ctasStatement = PrintStatementExplainer.getCreateStatement(tableName, host, port); + JobStatement ctasJobStatement = JobStatement.generateJobStatement( + jobStatement.getIndex(), ctasStatement, JobStatementType.SQL, SqlType.CTAS); + JobSqlRunner jobSqlRunner = new JobSqlRunner(jobManager); + jobSqlRunner.explain(ctasJobStatement); + } + resultBuilder + .explainTrue(true) + .type(jobStatement.getSqlType().getType()) + .sql(jobStatement.getStatement()) + .index(jobStatement.getIndex()); + } catch (Exception e) { + String error = StrFormatter.format( + "Exception in explaining FlinkSQL:\n{}\n{}", + SqlUtil.addLineNumber(jobStatement.getStatement()), + LogUtil.getError(e)); + resultBuilder + .error(error) + .explainTrue(false) + .type(jobStatement.getSqlType().getType()) + .sql(jobStatement.getStatement()) + .index(jobStatement.getIndex()); + log.error(error); + } finally { + resultBuilder.explainTime(LocalDateTime.now()); + return resultBuilder.build(); + } + } } diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobSetRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobSetRunner.java index 095eafdb8a..efe0345d0c 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobSetRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobSetRunner.java @@ -19,14 +19,15 @@ package org.dinky.job.runner; +import org.dinky.job.AbstractJobRunner; import org.dinky.job.JobManager; -import org.dinky.job.JobRunner; import org.dinky.job.JobStatement; import org.dinky.trans.ddl.CustomSetOperation; -public class JobSetRunner implements JobRunner { +import lombok.extern.slf4j.Slf4j; - private JobManager jobManager; +@Slf4j +public class JobSetRunner extends AbstractJobRunner { public JobSetRunner(JobManager jobManager) { this.jobManager = jobManager; diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java index 1725ade2bd..d02fc3054a 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java @@ -24,38 +24,49 @@ import org.dinky.data.result.IResult; import org.dinky.data.result.InsertResult; import org.dinky.data.result.ResultBuilder; +import org.dinky.data.result.SqlExplainResult; import org.dinky.executor.Executor; import org.dinky.gateway.Gateway; import org.dinky.gateway.result.GatewayResult; import org.dinky.interceptor.FlinkInterceptor; import org.dinky.interceptor.FlinkInterceptorResult; +import org.dinky.job.AbstractJobRunner; import org.dinky.job.Job; import org.dinky.job.JobConfig; import org.dinky.job.JobManager; -import org.dinky.job.JobRunner; import org.dinky.job.JobStatement; import org.dinky.parser.SqlType; +import org.dinky.utils.LogUtil; +import org.dinky.utils.SqlUtil; import org.dinky.utils.URLUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.operations.ModifyOperation; +import org.apache.flink.table.operations.Operation; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.UUID; -public class JobSqlRunner implements JobRunner { +import cn.hutool.core.text.StrFormatter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class JobSqlRunner extends AbstractJobRunner { - private JobManager jobManager; - private List insertStatements; private List modifyOperations; + private List statements; + private List operations; public JobSqlRunner(JobManager jobManager) { this.jobManager = jobManager; this.modifyOperations = new ArrayList<>(); + this.statements = new ArrayList<>(); + this.operations = new ArrayList<>(); } @Override @@ -67,6 +78,46 @@ public void run(JobStatement jobStatement) throws Exception { } } + @Override + public SqlExplainResult explain(JobStatement jobStatement) { + SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); + + try { + ModifyOperation modifyOperation = + jobManager.getExecutor().getModifyOperationFromInsert(jobStatement.getStatement()); + operations.add(modifyOperation); + statements.add(jobStatement.getStatement()); + if (jobStatement.isFinalExecutableStatement()) { + SqlExplainResult sqlExplainResult = jobManager.getExecutor().explainOperation(operations); + resultBuilder = SqlExplainResult.newBuilder(sqlExplainResult); + resultBuilder.sql(getParsedSql()).index(jobStatement.getIndex()); + } else { + resultBuilder + .sql(getParsedSql()) + .type(jobStatement.getSqlType().getType()) + .parseTrue(true) + .explainTrue(true) + .index(jobStatement.getIndex()) + .isSkipped(); + } + } catch (Exception e) { + String error = StrFormatter.format( + "Exception in explaining FlinkSQL:\n{}\n{}", + SqlUtil.addLineNumber(jobStatement.getStatement()), + LogUtil.getError(e)); + resultBuilder + .error(error) + .explainTrue(false) + .type(jobStatement.getSqlType().getType()) + .sql(jobStatement.getStatement()) + .index(jobStatement.getIndex()); + log.error(error); + } finally { + resultBuilder.explainTime(LocalDateTime.now()); + return resultBuilder.build(); + } + } + private void handleStatementSet(JobStatement jobStatement) throws Exception { if (jobManager.isUseGateway()) { processWithGateway(jobStatement); @@ -94,7 +145,7 @@ private void processWithoutGateway(JobStatement jobStatement) throws Exception { ModifyOperation modifyOperation = jobManager.getExecutor().getModifyOperationFromInsert(jobStatement.getStatement()); modifyOperations.add(modifyOperation); - insertStatements.add(jobStatement.getStatement()); + statements.add(jobStatement.getStatement()); if (jobStatement.isFinalExecutableStatement()) { TableResult tableResult = jobManager.getExecutor().executeModifyOperations(modifyOperations); updateJobWithTableResult(tableResult); @@ -166,7 +217,7 @@ private GatewayResult submitByGateway(JobStatement jobStatement) { GatewayType runMode = jobManager.getRunMode(); Executor executor = jobManager.getExecutor(); - insertStatements.add(jobStatement.getStatement()); + statements.add(jobStatement.getStatement()); // Use gateway need to build gateway config, include flink configuration. config.addGatewayConfig(executor.getCustomTableEnvironment().getConfig().getConfiguration()); @@ -198,11 +249,11 @@ private GatewayResult submitByGateway(JobStatement jobStatement) { private String getParsedSql() { StringBuilder sb = new StringBuilder(); - for (String insertStatement : insertStatements) { + for (String statement : statements) { if (sb.length() > 0) { sb.append(";\n"); } - sb.append(insertStatement); + sb.append(statement); } return sb.toString(); } diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobUDFRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobUDFRunner.java index 71c8d3eef2..b71a09b88a 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobUDFRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobUDFRunner.java @@ -26,8 +26,8 @@ import org.dinky.data.model.SystemConfiguration; import org.dinky.function.data.model.UDF; import org.dinky.function.util.UDFUtil; +import org.dinky.job.AbstractJobRunner; import org.dinky.job.JobManager; -import org.dinky.job.JobRunner; import org.dinky.job.JobStatement; import org.dinky.utils.URLUtils; @@ -44,9 +44,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -public class JobUDFRunner implements JobRunner { - - private JobManager jobManager; +public class JobUDFRunner extends AbstractJobRunner { public JobUDFRunner(JobManager jobManager) { this.jobManager = jobManager;