Skip to content

Commit

Permalink
[Optimization-2664][core] Optimize udf and error info
Browse files Browse the repository at this point in the history
  • Loading branch information
aiwenmo committed Dec 16, 2023
1 parent b8e6a44 commit e50a3dc
Show file tree
Hide file tree
Showing 16 changed files with 116 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.executor;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
Expand Down Expand Up @@ -58,4 +59,6 @@ public ClassLoader getUserClassLoader() {
public Planner getPlanner() {
return ((StreamTableEnvironmentImpl) streamTableEnvironment).getPlanner();
}

public abstract <T> void addConfiguration(ConfigOption<T> option, T value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
Expand Down Expand Up @@ -71,6 +72,7 @@
import org.apache.flink.types.Row;

import java.io.File;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
Expand Down Expand Up @@ -124,22 +126,27 @@ public CustomTableEnvironmentImpl(

public static CustomTableEnvironmentImpl create(
StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) {
return create(executionEnvironment, EnvironmentSettings.newInstance().build(), TableConfig.getDefault());
return create(
executionEnvironment, EnvironmentSettings.newInstance().build(), TableConfig.getDefault(), classLoader);
}

public static CustomTableEnvironmentImpl createBatch(StreamExecutionEnvironment executionEnvironment) {
public static CustomTableEnvironmentImpl createBatch(
StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) {
Configuration configuration = new Configuration();
configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
TableConfig tableConfig = new TableConfig();
tableConfig.addConfiguration(configuration);
return create(executionEnvironment, EnvironmentSettings.inBatchMode(), tableConfig);
return create(executionEnvironment, EnvironmentSettings.inBatchMode(), tableConfig, classLoader);
}

public static CustomTableEnvironmentImpl create(
StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings, TableConfig tableConfig) {
StreamExecutionEnvironment executionEnvironment,
EnvironmentSettings settings,
TableConfig tableConfig,
ClassLoader classLoader) {

// temporary solution until FLINK-15635 is fixed
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

final ModuleManager moduleManager = new ModuleManager();

Expand Down Expand Up @@ -226,15 +233,37 @@ public ObjectNode getStreamGraph(String statement) {

@Override
public void addJar(File... jarPath) {
Configuration configuration = new Configuration(this.getRootConfiguration());
Configuration configuration =
(Configuration) getStreamExecutionEnvironment().getConfiguration();
List<String> pathList =
Arrays.stream(URLUtil.getURLs(jarPath)).map(URL::toString).collect(Collectors.toList());
List<String> jars = configuration.get(PipelineOptions.JARS);
if (jars == null) {
configuration.set(PipelineOptions.JARS, pathList);
} else {
if (jars != null) {
CollUtil.addAll(jars, pathList);
}
Map<String, Object> flinkConfigurationMap = getFlinkConfigurationMap();
flinkConfigurationMap.put(PipelineOptions.JARS.key(), jars);
}

@Override
public <T> void addConfiguration(ConfigOption<T> option, T value) {
Map<String, Object> flinkConfigurationMap = getFlinkConfigurationMap();
flinkConfigurationMap.put(option.key(), value);
}

private Map<String, Object> getFlinkConfigurationMap() {
Field configuration = null;
try {
configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration o = (Configuration) configuration.get(getStreamExecutionEnvironment());
Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map<String, Object> temp = (Map<String, Object>) confData.get(o);
return temp;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.executor;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
Expand Down Expand Up @@ -61,4 +62,6 @@ public ClassLoader getUserClassLoader() {
public Configuration getRootConfiguration() {
return (Configuration) this.getConfig().getRootConfiguration();
}

public abstract <T> void addConfiguration(ConfigOption<T> option, T value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
Expand Down Expand Up @@ -64,6 +65,7 @@
import org.apache.flink.types.Row;

import java.io.File;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -112,24 +114,26 @@ public CustomTableEnvironmentImpl(

public static CustomTableEnvironmentImpl create(
StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) {
return create(executionEnvironment, EnvironmentSettings.newInstance().build());
return create(executionEnvironment, EnvironmentSettings.newInstance().build(), classLoader);
}

public static CustomTableEnvironmentImpl createBatch(StreamExecutionEnvironment executionEnvironment) {
public static CustomTableEnvironmentImpl createBatch(
StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) {
Configuration configuration = new Configuration();
configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
TableConfig tableConfig = new TableConfig();
tableConfig.addConfiguration(configuration);
return create(
executionEnvironment,
EnvironmentSettings.newInstance().inBatchMode().build());
EnvironmentSettings.newInstance().inBatchMode().build(),
classLoader);
}

public static CustomTableEnvironmentImpl create(
StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings, ClassLoader classLoader) {

// temporary solution until FLINK-15635 is fixed
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

final Executor executor = lookupExecutor(classLoader, executionEnvironment);

Expand Down Expand Up @@ -198,17 +202,38 @@ public ObjectNode getStreamGraph(String statement) {
}
}

@Override
public void addJar(File... jarPath) {
Configuration configuration = new Configuration(this.getRootConfiguration());
Configuration configuration =
(Configuration) getStreamExecutionEnvironment().getConfiguration();
List<String> pathList =
Arrays.stream(URLUtil.getURLs(jarPath)).map(URL::toString).collect(Collectors.toList());
List<String> jars = configuration.get(PipelineOptions.JARS);
if (jars == null) {
configuration.set(PipelineOptions.JARS, pathList);
} else {
if (jars != null) {
CollUtil.addAll(jars, pathList);
}
Map<String, Object> flinkConfigurationMap = getFlinkConfigurationMap();
flinkConfigurationMap.put(PipelineOptions.JARS.key(), jars);
}

@Override
public <T> void addConfiguration(ConfigOption<T> option, T value) {
Map<String, Object> flinkConfigurationMap = getFlinkConfigurationMap();
flinkConfigurationMap.put(option.key(), value);
}

private Map<String, Object> getFlinkConfigurationMap() {
Field configuration = null;
try {
configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration o = (Configuration) configuration.get(getStreamExecutionEnvironment());
Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map<String, Object> temp = (Map<String, Object>) confData.get(o);
return temp;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,14 @@ public static CustomTableEnvironmentImpl create(
EnvironmentSettings.newInstance().withClassLoader(classLoader).build());
}

public static CustomTableEnvironmentImpl createBatch(StreamExecutionEnvironment executionEnvironment) {
public static CustomTableEnvironmentImpl createBatch(
StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) {
return create(
executionEnvironment,
EnvironmentSettings.newInstance().inBatchMode().build());
EnvironmentSettings.newInstance()
.withClassLoader(classLoader)
.inBatchMode()
.build());
}

public static CustomTableEnvironmentImpl create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,14 @@ public static CustomTableEnvironmentImpl create(
EnvironmentSettings.newInstance().withClassLoader(classLoader).build());
}

public static CustomTableEnvironmentImpl createBatch(StreamExecutionEnvironment executionEnvironment) {
public static CustomTableEnvironmentImpl createBatch(
StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) {
return create(
executionEnvironment,
EnvironmentSettings.newInstance().inBatchMode().build());
EnvironmentSettings.newInstance()
.withClassLoader(classLoader)
.inBatchMode()
.build());
}

public static CustomTableEnvironmentImpl create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,14 @@ public static CustomTableEnvironmentImpl create(
EnvironmentSettings.newInstance().withClassLoader(classLoader).build());
}

public static CustomTableEnvironmentImpl createBatch(StreamExecutionEnvironment executionEnvironment) {
public static CustomTableEnvironmentImpl createBatch(
StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) {
return create(
executionEnvironment,
EnvironmentSettings.newInstance().inBatchMode().build());
EnvironmentSettings.newInstance()
.withClassLoader(classLoader)
.inBatchMode()
.build());
}

public static CustomTableEnvironmentImpl create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.data.model.LineageRel;
import org.dinky.data.result.SqlExplainResult;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand Down Expand Up @@ -93,4 +94,10 @@ default void addJar(File... jarPath) {
CollUtil.addAll(jars, pathList);
}
}

default <T> void addConfiguration(ConfigOption<T> option, T value) {
Configuration configuration =
(Configuration) getStreamExecutionEnvironment().getConfiguration();
configuration.set(option, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ public AppBatchExecutor(ExecutorConfig executorConfig, DinkyClassLoader classLoa

@Override
CustomTableEnvironment createCustomTableEnvironment(ClassLoader classLoader) {
return CustomTableEnvironmentImpl.createBatch(environment);
return CustomTableEnvironmentImpl.createBatch(environment, classLoader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,6 @@ public LocalBatchExecutor(ExecutorConfig executorConfig, DinkyClassLoader classL

@Override
CustomTableEnvironment createCustomTableEnvironment(ClassLoader classLoader) {
return CustomTableEnvironmentImpl.createBatch(environment);
return CustomTableEnvironmentImpl.createBatch(environment, classLoader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ public RemoteBatchExecutor(ExecutorConfig executorConfig, DinkyClassLoader class

@Override
CustomTableEnvironment createCustomTableEnvironment(ClassLoader classLoader) {
return CustomTableEnvironmentImpl.createBatch(environment);
return CustomTableEnvironmentImpl.createBatch(environment, classLoader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -453,12 +453,10 @@ private static List<String> execPyAndGetUdfNameList(String pyPath, String pyFile

public static void addConfigurationClsAndJars(
CustomTableEnvironment customTableEnvironment, List<URL> jarList, List<URL> classpaths) {
Configuration configuration = (Configuration)
customTableEnvironment.getStreamExecutionEnvironment().getConfiguration();
configuration.set(
customTableEnvironment.addConfiguration(
PipelineOptions.CLASSPATHS,
classpaths.stream().map(URL::toString).collect(Collectors.toList()));
configuration.set(
customTableEnvironment.addConfiguration(
PipelineOptions.JARS, jarList.stream().map(URL::toString).collect(Collectors.toList()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ const Result = (props: any) => {
if (consoleData.result && !isRefresh) {
setData(consoleData.result);
} else {
if (current.dialect == DIALECT.FLINK_SQL) {
if (current.dialect && current.dialect.toLowerCase() == DIALECT.FLINK_SQL) {
// flink sql
// to do: get job data by history id list, not flink jid
if (current.id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,14 @@ const Explain: React.FC<ExplainProps> = (props: any) => {
// if (selectsql == null || selectsql == '') {
// selectsql = current.value;
// }
// let useSession = !!currentSession.session;
let param = {
...current,
// useSession: useSession,
// session: currentSession.session,
configJson: current?.config,
taskId: current?.id
};
setResult(<Text>{l('pages.datastudio.explain.validate')}</Text>);
setExplainData([]);
const result = explainSql(param);
const result = explainSql(l('pages.datastudio.editor.checking', '', { jobName: current?.name }),param);
result.then((res) => {
const errorExplainData: [] = [];
let errorCount: number = 0;
Expand Down
9 changes: 4 additions & 5 deletions dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@
*
*/

import { postAll } from '@/services/api';
import { handleGetOption, handleOption } from '@/services/BusinessCrud';
import {handleGetOption, handleOption} from '@/services/BusinessCrud';
import { DIALECT } from '@/services/constants';

export async function explainSql(params: any) {
return postAll('/api/task/explainSql', params);
export async function explainSql(title: string, params: any) {
return handleOption('/api/task/explainSql', title, params);
}

export async function getJobPlan(title: string, params: any) {
return handleOption('/api/task/getJobPlan', title, params);
}

export async function debugTask(title: string, params: any) {
return postAll('/api/task/debugTask', params);
return handleOption('/api/task/debugTask', title, params);
}

export async function executeSql(title: string, id: number) {
Expand Down
1 change: 1 addition & 0 deletions dinky-web/src/services/BusinessCrud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ export const handlePutData = async (url: string, fields: any) => {
return false;
}
};

export const handlePutDataJson = async (url: string, fields: any) => {
const tipsTitle = fields?.id ? l('app.request.update') : l('app.request.add');
await LoadingMessageAsync(l('app.request.running') + tipsTitle);
Expand Down

0 comments on commit e50a3dc

Please sign in to comment.