From e50a3dc13673bd37ee851833b3234b078cf2e80b Mon Sep 17 00:00:00 2001 From: wenmo <32723967+wenmo@users.noreply.github.com> Date: Sat, 16 Dec 2023 23:21:50 +0800 Subject: [PATCH] [Optimization-2664][core] Optimize udf and error info --- .../AbstractCustomTableEnvironment.java | 3 ++ .../executor/CustomTableEnvironmentImpl.java | 47 +++++++++++++++---- .../AbstractCustomTableEnvironment.java | 3 ++ .../executor/CustomTableEnvironmentImpl.java | 45 ++++++++++++++---- .../executor/CustomTableEnvironmentImpl.java | 8 +++- .../executor/CustomTableEnvironmentImpl.java | 8 +++- .../executor/CustomTableEnvironmentImpl.java | 8 +++- .../executor/CustomTableEnvironment.java | 7 +++ .../org/dinky/executor/AppBatchExecutor.java | 2 +- .../dinky/executor/LocalBatchExecutor.java | 2 +- .../dinky/executor/RemoteBatchExecutor.java | 2 +- .../java/org/dinky/function/util/UDFUtil.java | 6 +-- .../BottomContainer/Result/index.tsx | 2 +- .../HeaderContainer/Explain/index.tsx | 5 +- .../DataStudio/HeaderContainer/service.tsx | 9 ++-- dinky-web/src/services/BusinessCrud.ts | 1 + 16 files changed, 116 insertions(+), 42 deletions(-) diff --git a/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java b/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java index 82f805783c..78922202fe 100644 --- a/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java +++ b/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java @@ -19,6 +19,7 @@ package org.dinky.executor; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -58,4 +59,6 @@ public ClassLoader getUserClassLoader() { public Planner getPlanner() { return ((StreamTableEnvironmentImpl) streamTableEnvironment).getPlanner(); } + + public abstract void addConfiguration(ConfigOption option, T value); } diff --git a/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java index 9cbbd97477..e206e120dc 100644 --- a/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java +++ b/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.PipelineOptions; @@ -71,6 +72,7 @@ import org.apache.flink.types.Row; import java.io.File; +import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URL; @@ -124,22 +126,27 @@ public CustomTableEnvironmentImpl( public static CustomTableEnvironmentImpl create( StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) { - return create(executionEnvironment, EnvironmentSettings.newInstance().build(), TableConfig.getDefault()); + return create( + executionEnvironment, EnvironmentSettings.newInstance().build(), TableConfig.getDefault(), classLoader); } - public static CustomTableEnvironmentImpl createBatch(StreamExecutionEnvironment executionEnvironment) { + public static CustomTableEnvironmentImpl createBatch( + StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) { Configuration configuration = new Configuration(); configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); TableConfig tableConfig = new TableConfig(); tableConfig.addConfiguration(configuration); - return create(executionEnvironment, EnvironmentSettings.inBatchMode(), tableConfig); + return create(executionEnvironment, EnvironmentSettings.inBatchMode(), tableConfig, classLoader); } public static CustomTableEnvironmentImpl create( - StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings, TableConfig tableConfig) { + StreamExecutionEnvironment executionEnvironment, + EnvironmentSettings settings, + TableConfig tableConfig, + ClassLoader classLoader) { // temporary solution until FLINK-15635 is fixed - final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + // final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); final ModuleManager moduleManager = new ModuleManager(); @@ -226,15 +233,37 @@ public ObjectNode getStreamGraph(String statement) { @Override public void addJar(File... jarPath) { - Configuration configuration = new Configuration(this.getRootConfiguration()); + Configuration configuration = + (Configuration) getStreamExecutionEnvironment().getConfiguration(); List pathList = Arrays.stream(URLUtil.getURLs(jarPath)).map(URL::toString).collect(Collectors.toList()); List jars = configuration.get(PipelineOptions.JARS); - if (jars == null) { - configuration.set(PipelineOptions.JARS, pathList); - } else { + if (jars != null) { CollUtil.addAll(jars, pathList); } + Map flinkConfigurationMap = getFlinkConfigurationMap(); + flinkConfigurationMap.put(PipelineOptions.JARS.key(), jars); + } + + @Override + public void addConfiguration(ConfigOption option, T value) { + Map flinkConfigurationMap = getFlinkConfigurationMap(); + flinkConfigurationMap.put(option.key(), value); + } + + private Map getFlinkConfigurationMap() { + Field configuration = null; + try { + configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration"); + configuration.setAccessible(true); + Configuration o = (Configuration) configuration.get(getStreamExecutionEnvironment()); + Field confData = Configuration.class.getDeclaredField("confData"); + confData.setAccessible(true); + Map temp = (Map) confData.get(o); + return temp; + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override diff --git a/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java b/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java index 6a074a2873..853ca1cff7 100644 --- a/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java +++ b/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java @@ -19,6 +19,7 @@ package org.dinky.executor; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; @@ -61,4 +62,6 @@ public ClassLoader getUserClassLoader() { public Configuration getRootConfiguration() { return (Configuration) this.getConfig().getRootConfiguration(); } + + public abstract void addConfiguration(ConfigOption option, T value); } diff --git a/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java index 4b1b9d02a6..face57465b 100644 --- a/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java +++ b/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.PipelineOptions; @@ -64,6 +65,7 @@ import org.apache.flink.types.Row; import java.io.File; +import java.lang.reflect.Field; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; @@ -112,24 +114,26 @@ public CustomTableEnvironmentImpl( public static CustomTableEnvironmentImpl create( StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) { - return create(executionEnvironment, EnvironmentSettings.newInstance().build()); + return create(executionEnvironment, EnvironmentSettings.newInstance().build(), classLoader); } - public static CustomTableEnvironmentImpl createBatch(StreamExecutionEnvironment executionEnvironment) { + public static CustomTableEnvironmentImpl createBatch( + StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) { Configuration configuration = new Configuration(); configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); TableConfig tableConfig = new TableConfig(); tableConfig.addConfiguration(configuration); return create( executionEnvironment, - EnvironmentSettings.newInstance().inBatchMode().build()); + EnvironmentSettings.newInstance().inBatchMode().build(), + classLoader); } public static CustomTableEnvironmentImpl create( - StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) { + StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings, ClassLoader classLoader) { // temporary solution until FLINK-15635 is fixed - final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + // final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); final Executor executor = lookupExecutor(classLoader, executionEnvironment); @@ -198,17 +202,38 @@ public ObjectNode getStreamGraph(String statement) { } } - @Override public void addJar(File... jarPath) { - Configuration configuration = new Configuration(this.getRootConfiguration()); + Configuration configuration = + (Configuration) getStreamExecutionEnvironment().getConfiguration(); List pathList = Arrays.stream(URLUtil.getURLs(jarPath)).map(URL::toString).collect(Collectors.toList()); List jars = configuration.get(PipelineOptions.JARS); - if (jars == null) { - configuration.set(PipelineOptions.JARS, pathList); - } else { + if (jars != null) { CollUtil.addAll(jars, pathList); } + Map flinkConfigurationMap = getFlinkConfigurationMap(); + flinkConfigurationMap.put(PipelineOptions.JARS.key(), jars); + } + + @Override + public void addConfiguration(ConfigOption option, T value) { + Map flinkConfigurationMap = getFlinkConfigurationMap(); + flinkConfigurationMap.put(option.key(), value); + } + + private Map getFlinkConfigurationMap() { + Field configuration = null; + try { + configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration"); + configuration.setAccessible(true); + Configuration o = (Configuration) configuration.get(getStreamExecutionEnvironment()); + Field confData = Configuration.class.getDeclaredField("confData"); + confData.setAccessible(true); + Map temp = (Map) confData.get(o); + return temp; + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java index d08b0119c6..32df8a3785 100644 --- a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java +++ b/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java @@ -83,10 +83,14 @@ public static CustomTableEnvironmentImpl create( EnvironmentSettings.newInstance().withClassLoader(classLoader).build()); } - public static CustomTableEnvironmentImpl createBatch(StreamExecutionEnvironment executionEnvironment) { + public static CustomTableEnvironmentImpl createBatch( + StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) { return create( executionEnvironment, - EnvironmentSettings.newInstance().inBatchMode().build()); + EnvironmentSettings.newInstance() + .withClassLoader(classLoader) + .inBatchMode() + .build()); } public static CustomTableEnvironmentImpl create( diff --git a/dinky-client/dinky-client-1.17/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.17/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java index 1889ba5d39..bcaf2ad6cc 100644 --- a/dinky-client/dinky-client-1.17/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java +++ b/dinky-client/dinky-client-1.17/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java @@ -86,10 +86,14 @@ public static CustomTableEnvironmentImpl create( EnvironmentSettings.newInstance().withClassLoader(classLoader).build()); } - public static CustomTableEnvironmentImpl createBatch(StreamExecutionEnvironment executionEnvironment) { + public static CustomTableEnvironmentImpl createBatch( + StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) { return create( executionEnvironment, - EnvironmentSettings.newInstance().inBatchMode().build()); + EnvironmentSettings.newInstance() + .withClassLoader(classLoader) + .inBatchMode() + .build()); } public static CustomTableEnvironmentImpl create( diff --git a/dinky-client/dinky-client-1.18/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.18/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java index 90020a5cd2..4f7fc58140 100644 --- a/dinky-client/dinky-client-1.18/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java +++ b/dinky-client/dinky-client-1.18/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java @@ -87,10 +87,14 @@ public static CustomTableEnvironmentImpl create( EnvironmentSettings.newInstance().withClassLoader(classLoader).build()); } - public static CustomTableEnvironmentImpl createBatch(StreamExecutionEnvironment executionEnvironment) { + public static CustomTableEnvironmentImpl createBatch( + StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) { return create( executionEnvironment, - EnvironmentSettings.newInstance().inBatchMode().build()); + EnvironmentSettings.newInstance() + .withClassLoader(classLoader) + .inBatchMode() + .build()); } public static CustomTableEnvironmentImpl create( diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java index 7feb1ad98e..aa25ef2406 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java @@ -22,6 +22,7 @@ import org.dinky.data.model.LineageRel; import org.dinky.data.result.SqlExplainResult; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -93,4 +94,10 @@ default void addJar(File... jarPath) { CollUtil.addAll(jars, pathList); } } + + default void addConfiguration(ConfigOption option, T value) { + Configuration configuration = + (Configuration) getStreamExecutionEnvironment().getConfiguration(); + configuration.set(option, value); + } } diff --git a/dinky-core/src/main/java/org/dinky/executor/AppBatchExecutor.java b/dinky-core/src/main/java/org/dinky/executor/AppBatchExecutor.java index 92bfb77c23..6d163611a0 100644 --- a/dinky-core/src/main/java/org/dinky/executor/AppBatchExecutor.java +++ b/dinky-core/src/main/java/org/dinky/executor/AppBatchExecutor.java @@ -44,6 +44,6 @@ public AppBatchExecutor(ExecutorConfig executorConfig, DinkyClassLoader classLoa @Override CustomTableEnvironment createCustomTableEnvironment(ClassLoader classLoader) { - return CustomTableEnvironmentImpl.createBatch(environment); + return CustomTableEnvironmentImpl.createBatch(environment, classLoader); } } diff --git a/dinky-core/src/main/java/org/dinky/executor/LocalBatchExecutor.java b/dinky-core/src/main/java/org/dinky/executor/LocalBatchExecutor.java index 142bd68f55..3a56e8c413 100644 --- a/dinky-core/src/main/java/org/dinky/executor/LocalBatchExecutor.java +++ b/dinky-core/src/main/java/org/dinky/executor/LocalBatchExecutor.java @@ -63,6 +63,6 @@ public LocalBatchExecutor(ExecutorConfig executorConfig, DinkyClassLoader classL @Override CustomTableEnvironment createCustomTableEnvironment(ClassLoader classLoader) { - return CustomTableEnvironmentImpl.createBatch(environment); + return CustomTableEnvironmentImpl.createBatch(environment, classLoader); } } diff --git a/dinky-core/src/main/java/org/dinky/executor/RemoteBatchExecutor.java b/dinky-core/src/main/java/org/dinky/executor/RemoteBatchExecutor.java index 72aeb228dc..2562932846 100644 --- a/dinky-core/src/main/java/org/dinky/executor/RemoteBatchExecutor.java +++ b/dinky-core/src/main/java/org/dinky/executor/RemoteBatchExecutor.java @@ -46,6 +46,6 @@ public RemoteBatchExecutor(ExecutorConfig executorConfig, DinkyClassLoader class @Override CustomTableEnvironment createCustomTableEnvironment(ClassLoader classLoader) { - return CustomTableEnvironmentImpl.createBatch(environment); + return CustomTableEnvironmentImpl.createBatch(environment, classLoader); } } diff --git a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java index 7ed6ec6a4c..9350b83aaf 100644 --- a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java +++ b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java @@ -453,12 +453,10 @@ private static List execPyAndGetUdfNameList(String pyPath, String pyFile public static void addConfigurationClsAndJars( CustomTableEnvironment customTableEnvironment, List jarList, List classpaths) { - Configuration configuration = (Configuration) - customTableEnvironment.getStreamExecutionEnvironment().getConfiguration(); - configuration.set( + customTableEnvironment.addConfiguration( PipelineOptions.CLASSPATHS, classpaths.stream().map(URL::toString).collect(Collectors.toList())); - configuration.set( + customTableEnvironment.addConfiguration( PipelineOptions.JARS, jarList.stream().map(URL::toString).collect(Collectors.toList())); } diff --git a/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx b/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx index 152bf6e88c..d81c939b3f 100644 --- a/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx +++ b/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx @@ -131,7 +131,7 @@ const Result = (props: any) => { if (consoleData.result && !isRefresh) { setData(consoleData.result); } else { - if (current.dialect == DIALECT.FLINK_SQL) { + if (current.dialect && current.dialect.toLowerCase() == DIALECT.FLINK_SQL) { // flink sql // to do: get job data by history id list, not flink jid if (current.id) { diff --git a/dinky-web/src/pages/DataStudio/HeaderContainer/Explain/index.tsx b/dinky-web/src/pages/DataStudio/HeaderContainer/Explain/index.tsx index 223a8f9d08..91d0bc9083 100644 --- a/dinky-web/src/pages/DataStudio/HeaderContainer/Explain/index.tsx +++ b/dinky-web/src/pages/DataStudio/HeaderContainer/Explain/index.tsx @@ -64,17 +64,14 @@ const Explain: React.FC = (props: any) => { // if (selectsql == null || selectsql == '') { // selectsql = current.value; // } - // let useSession = !!currentSession.session; let param = { ...current, - // useSession: useSession, - // session: currentSession.session, configJson: current?.config, taskId: current?.id }; setResult({l('pages.datastudio.explain.validate')}); setExplainData([]); - const result = explainSql(param); + const result = explainSql(l('pages.datastudio.editor.checking', '', { jobName: current?.name }),param); result.then((res) => { const errorExplainData: [] = []; let errorCount: number = 0; diff --git a/dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx b/dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx index bee5d82c0b..4290be5c0a 100644 --- a/dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx +++ b/dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx @@ -17,12 +17,11 @@ * */ -import { postAll } from '@/services/api'; -import { handleGetOption, handleOption } from '@/services/BusinessCrud'; +import {handleGetOption, handleOption} from '@/services/BusinessCrud'; import { DIALECT } from '@/services/constants'; -export async function explainSql(params: any) { - return postAll('/api/task/explainSql', params); +export async function explainSql(title: string, params: any) { + return handleOption('/api/task/explainSql', title, params); } export async function getJobPlan(title: string, params: any) { @@ -30,7 +29,7 @@ export async function getJobPlan(title: string, params: any) { } export async function debugTask(title: string, params: any) { - return postAll('/api/task/debugTask', params); + return handleOption('/api/task/debugTask', title, params); } export async function executeSql(title: string, id: number) { diff --git a/dinky-web/src/services/BusinessCrud.ts b/dinky-web/src/services/BusinessCrud.ts index a684ed9b50..cf06314879 100644 --- a/dinky-web/src/services/BusinessCrud.ts +++ b/dinky-web/src/services/BusinessCrud.ts @@ -246,6 +246,7 @@ export const handlePutData = async (url: string, fields: any) => { return false; } }; + export const handlePutDataJson = async (url: string, fields: any) => { const tipsTitle = fields?.id ? l('app.request.update') : l('app.request.add'); await LoadingMessageAsync(l('app.request.running') + tipsTitle);