From 1c657bd992848bded54f3a0c153d60760cf13d61 Mon Sep 17 00:00:00 2001 From: ZackYoung Date: Fri, 20 Sep 2024 18:17:15 +0800 Subject: [PATCH] [BugFix][Flink]Fix execute jar submit in yarn-application (#3820) Co-authored-by: zackyoungh --- .../dinky/service/impl/TaskServiceImpl.java | 4 +- .../java/org/dinky/service/task/BaseTask.java | 2 + .../dinky/url/RsURLStreamHandlerFactory.java | 2 +- .../dinky/trans/dml/ExecuteJarOperation.java | 8 +++- .../dinky/url/RsURLStreamHandlerFactory.java | 10 +++++ .../org/dinky/context/TaskContextHolder.java | 37 +++++++++++++++++++ .../java/org/dinky/executor/Executor.java | 4 +- .../job/builder/JobJarStreamGraphBuilder.java | 2 + .../gateway/yarn/YarnApplicationGateway.java | 18 ++++++--- 9 files changed, 75 insertions(+), 12 deletions(-) create mode 100644 dinky-common/src/main/java/org/dinky/context/TaskContextHolder.java diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index c91a3cef51..0d8a4c6fe0 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -201,9 +201,7 @@ public TaskDTO prepareTask(TaskSubmitDto submitDto) { @ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE) public JobResult executeJob(TaskDTO task) throws Exception { - JobResult jobResult = BaseTask.getTask(task).execute(); - log.info("execute job finished,status is {}", jobResult.getStatus()); - return jobResult; + return executeJob(task, false); } @ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE) diff --git a/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java b/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java index 5b34a446d4..bb41ea9db1 100644 --- a/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java +++ b/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java @@ -20,6 +20,7 @@ package org.dinky.service.task; import org.dinky.config.Dialect; +import org.dinky.context.TaskContextHolder; import org.dinky.data.annotations.SupportDialect; import org.dinky.data.dto.TaskDTO; import org.dinky.data.exception.NotSupportExplainExcepition; @@ -64,6 +65,7 @@ public static BaseTask getTask(TaskDTO taskDTO) { if (annotation != null) { for (Dialect dialect : annotation.value()) { if (dialect.isDialect(taskDTO.getDialect())) { + TaskContextHolder.setDialect(dialect); return (BaseTask) ReflectUtil.newInstance(clazz, taskDTO); } } diff --git a/dinky-admin/src/main/java/org/dinky/url/RsURLStreamHandlerFactory.java b/dinky-admin/src/main/java/org/dinky/url/RsURLStreamHandlerFactory.java index f80dadc550..81e3b10d16 100644 --- a/dinky-admin/src/main/java/org/dinky/url/RsURLStreamHandlerFactory.java +++ b/dinky-admin/src/main/java/org/dinky/url/RsURLStreamHandlerFactory.java @@ -33,7 +33,7 @@ @Profile("!test") public class RsURLStreamHandlerFactory implements URLStreamHandlerFactory { - private final List notContains = Arrays.asList("jar", "file"); + private final List notContains = Arrays.asList("jar", "file", "http", "https"); @Override public URLStreamHandler createURLStreamHandler(String protocol) { diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java index 280590661a..2cce7d1792 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java @@ -21,6 +21,8 @@ import static org.dinky.utils.RunTimeUtil.extractArgs; +import org.dinky.config.Dialect; +import org.dinky.context.TaskContextHolder; import org.dinky.executor.CustomTableEnvironment; import org.dinky.trans.AbstractOperation; import org.dinky.trans.ExtendOperation; @@ -61,7 +63,11 @@ public ExecuteJarOperation(String statement) { public Optional execute(CustomTableEnvironment tEnv) { try { StreamExecutionEnvironment streamExecutionEnvironment = tEnv.getStreamExecutionEnvironment(); - FlinkStreamEnvironmentUtil.executeAsync(getStreamGraph(tEnv), streamExecutionEnvironment); + if (TaskContextHolder.getDialect().equals(Dialect.FLINK_JAR)) { + FlinkStreamEnvironmentUtil.executeAsync(getStreamGraph(tEnv), streamExecutionEnvironment); + } else { + throw new RuntimeException("Please perform Execute jar syntax in the FlinkJar task !"); + } } catch (Exception e) { throw new RuntimeException(e); } diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/url/RsURLStreamHandlerFactory.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/url/RsURLStreamHandlerFactory.java index b452a583cb..7a7dc3c6c2 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/url/RsURLStreamHandlerFactory.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/url/RsURLStreamHandlerFactory.java @@ -21,12 +21,22 @@ import java.net.URLStreamHandler; import java.net.URLStreamHandlerFactory; +import java.util.Arrays; +import java.util.List; + +import cn.hutool.core.util.StrUtil; public class RsURLStreamHandlerFactory implements URLStreamHandlerFactory { private static final String PREFIX = "sun.net.www.protocol"; + private final List notContains = Arrays.asList("jar", "file", "http", "https"); @Override public URLStreamHandler createURLStreamHandler(String protocol) { + for (String tempProtocol : notContains) { + if (tempProtocol.equals(StrUtil.sub(protocol, 0, tempProtocol.length()))) { + return null; + } + } if (ResourceFileSystem.URI_SCHEMA.getScheme().equals(protocol)) { return new RsURLStreamHandler(); } diff --git a/dinky-common/src/main/java/org/dinky/context/TaskContextHolder.java b/dinky-common/src/main/java/org/dinky/context/TaskContextHolder.java new file mode 100644 index 0000000000..5d872226b3 --- /dev/null +++ b/dinky-common/src/main/java/org/dinky/context/TaskContextHolder.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.context; + +import org.dinky.config.Dialect; + +import java.util.Optional; + +public class TaskContextHolder { + private static final ThreadLocal DIALECT_THREAD_LOCAL = new InheritableThreadLocal<>(); + + public static Dialect getDialect() { + return Optional.ofNullable(DIALECT_THREAD_LOCAL.get()) + .orElseThrow(() -> new RuntimeException("task dialect is null")); + } + + public static void setDialect(Dialect dialect) { + DIALECT_THREAD_LOCAL.set(dialect); + } +} diff --git a/dinky-core/src/main/java/org/dinky/executor/Executor.java b/dinky-core/src/main/java/org/dinky/executor/Executor.java index e4febf0782..f9be626daf 100644 --- a/dinky-core/src/main/java/org/dinky/executor/Executor.java +++ b/dinky-core/src/main/java/org/dinky/executor/Executor.java @@ -220,8 +220,8 @@ public void initPyUDF(String executable, String... udfPyFilePath) { } Configuration configuration = tableEnvironment.getConfig().getConfiguration(); - configuration.setString(PythonOptions.PYTHON_FILES, String.join(",", udfPyFilePath)); - configuration.setString(PythonOptions.PYTHON_CLIENT_EXECUTABLE, executable); + configuration.set(PythonOptions.PYTHON_FILES, String.join(",", udfPyFilePath)); + configuration.set(PythonOptions.PYTHON_CLIENT_EXECUTABLE, executable); } private void addJar(String... jarPath) { diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java index 42fea3de21..3e8ee6c83a 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java @@ -45,6 +45,7 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; @@ -117,6 +118,7 @@ public void run() throws Exception { } private GatewayResult submitGateway() throws Exception { + configuration.set(PipelineOptions.JARS, getUris(job.getStatement())); config.addGatewayConfig(configuration); config.getGatewayConfig().setSql(job.getStatement()); return Gateway.build(config.getGatewayConfig()).submitJar(jobManager.getUdfPathContextHolder()); diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnApplicationGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnApplicationGateway.java index 2193be4ca9..46927b5cf5 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnApplicationGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnApplicationGateway.java @@ -39,10 +39,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import java.io.File; -import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.stream.Collectors; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.URLUtil; /** @@ -58,8 +59,9 @@ public GatewayType getType() { } /** - * format url - *

if url is rs protocol, convert to file path

+ * format url + *

if url is rs protocol, convert to file path

+ * * @param url url * @return formatted url */ @@ -77,8 +79,11 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) { init(); } + List beforePipelineJars = configuration.get(PipelineOptions.JARS); + AppConfig appConfig = config.getAppConfig(); configuration.set(PipelineOptions.JARS, Collections.singletonList(formatUrl(appConfig.getUserJarPath()))); + configuration.setString( "python.files", udfPathContextHolder.getPyUdfFile().stream().map(File::getName).collect(Collectors.joining(","))); @@ -90,13 +95,16 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) { createClusterSpecificationBuilder(); ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(userJarParas, appConfig.getUserJarMainAppClass()); - YarnResult result = YarnResult.build(getType()); String webUrl; try (YarnClusterDescriptor yarnClusterDescriptor = createYarnClusterDescriptorWithJar(udfPathContextHolder)) { ClusterDescriptorAdapterImpl clusterDescriptorAdapter = new ClusterDescriptorAdapterImpl(yarnClusterDescriptor); - clusterDescriptorAdapter.addShipFiles(Arrays.asList(preparSqlFile())); + if (CollUtil.isNotEmpty(beforePipelineJars)) { + clusterDescriptorAdapter.addShipFiles( + beforePipelineJars.stream().map(URLUtils::toFile).collect(Collectors.toList())); + } + clusterDescriptorAdapter.addShipFiles(Collections.singletonList(preparSqlFile())); addConfigParas( CustomerConfigureOptions.EXEC_SQL_FILE, configuration.get(CustomerConfigureOptions.EXEC_SQL_FILE));