From e807c0e9f2f8c3507ae35870f715449928ad5de7 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 1 Sep 2023 21:59:56 +0800 Subject: [PATCH] [Feature] Yarn supports pyflink (#2956) * yarn application supports pyflink * yarn perjob supports pyflink * pyflink supports flink connector --- .../streampark/common/conf/ConfigConst.scala | 8 ++ .../streampark/common/conf/Workspace.scala | 2 + .../streampark/common/util/EnvUtils.java | 41 +++++++++ .../src/main/assembly/assembly.xml | 5 ++ .../src/main/assembly/python/.gitkeep | 0 .../service/impl/ApplicationServiceImpl.java | 9 ++ .../service/impl/ResourceServiceImpl.java | 4 + .../src/locales/lang/en/flink/app.ts | 2 +- .../src/locales/lang/zh-CN/flink/app.ts | 2 +- .../flink/app/components/UploadJobJar.vue | 2 +- .../flink/client/bean/SubmitRequest.scala | 1 + .../streampark-flink-client-core/pom.xml | 7 ++ .../src/main/resources/pyflink.md | 87 +++++++++++++++++++ .../client/impl/YarnApplicationClient.scala | 45 +++++++++- .../flink/client/trait/FlinkClientTrait.scala | 65 +++++++++++--- .../streampark/flink/util/FlinkUtils.scala | 16 ++++ 16 files changed, 280 insertions(+), 16 deletions(-) create mode 100644 streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java create mode 100644 streampark-console/streampark-console-service/src/main/assembly/python/.gitkeep create mode 100644 streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/resources/pyflink.md diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala index 2d6b74fbad..6c5c6a6d5f 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala @@ -26,6 +26,14 @@ object ConfigConst { val PARAM_PREFIX = "--" + /** pyflink */ + + val PYTHON_SUFFIX = ".py" + + val PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver" + + val PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3" + /** about parameter... */ val KEY_APP_HOME = "app.home" diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala index 03b0c96824..d914780a1a 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala @@ -102,6 +102,8 @@ case class Workspace(storageType: StorageType) { lazy val APP_UPLOADS = s"$WORKSPACE/uploads" + lazy val APP_PYTHON_VENV = s"$WORKSPACE/python/venv.zip" + lazy val APP_WORKSPACE = s"$WORKSPACE/workspace" lazy val APP_FLINK = s"$WORKSPACE/flink" diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java b/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java new file mode 100644 index 0000000000..a8fd54be3a --- /dev/null +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.streampark.common.util; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Map; + + +public class EnvUtils { + public static void setEnv(String name, String value) throws Exception { + getModifiableEnvironment().put(name, value); + } + + @SuppressWarnings("unchecked") + private static Map getModifiableEnvironment() throws Exception { + Class pe = Class.forName("java.lang.ProcessEnvironment"); + Method getenv = pe.getDeclaredMethod("getenv"); + getenv.setAccessible(true); + Object unmodifiableEnvironment = getenv.invoke(null); + Class map = Class.forName("java.util.Collections$UnmodifiableMap"); + Field m = map.getDeclaredField("m"); + m.setAccessible(true); + return (Map) m.get(unmodifiableEnvironment); + } +} + diff --git a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml index aa4677c4a9..5161835c01 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml +++ b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml @@ -69,6 +69,11 @@ logs 0755 + + ${project.build.directory}/../src/main/assembly/python + python + 0755 + ${project.build.directory}/../src/main/assembly/temp temp diff --git a/streampark-console/streampark-console-service/src/main/assembly/python/.gitkeep b/streampark-console/streampark-console-service/src/main/assembly/python/.gitkeep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 99a28a87c6..9e970408ff 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -1559,6 +1559,14 @@ public void start(Application appParam, boolean auto) throws Exception { String applicationArgs = variableService.replaceVariable(application.getTeamId(), application.getArgs()); + String pyflinkFilePath = ""; + Resource resource = + resourceService.findByResourceName(application.getTeamId(), application.getJar()); + if (resource != null + && StringUtils.isNotBlank(resource.getFilePath()) + && resource.getFilePath().endsWith(ConfigConst.PYTHON_SUFFIX())) { + pyflinkFilePath = resource.getFilePath(); + } SubmitRequest submitRequest = new SubmitRequest( flinkEnv.getFlinkVersion(), @@ -1574,6 +1582,7 @@ public void start(Application appParam, boolean auto) throws Exception { getSavePointed(appParam), appParam.getRestoreMode() == null ? null : RestoreMode.of(appParam.getRestoreMode()), applicationArgs, + pyflinkFilePath, buildResult, kubernetesSubmitParam, extraParameter); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java index 5e667d5b7e..40314bca53 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.streampark.console.core.service.impl; +import org.apache.streampark.common.conf.ConfigConst; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.common.util.Utils; @@ -286,6 +287,9 @@ public RestResponse checkResource(Resource resourceParam) throws JsonProcessingE resp.put("exception", Utils.stringifyException(e)); return RestResponse.success().data(resp); } + if (jarFile.getName().endsWith(ConfigConst.PYTHON_SUFFIX())) { + return RestResponse.success().data(resp); + } Manifest manifest = Utils.getJarManifest(jarFile); String mainClass = manifest.getMainAttributes().getValue("Main-Class"); diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts index df9ee4a81c..f9fd462d64 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts @@ -62,7 +62,7 @@ export default { programJar: 'Program Jar', dynamicProperties: 'Dynamic Properties', hadoopConfigTitle: 'System Hadoop Configuration', - dragUploadTitle: 'Click or drag jar to this area to upload', + dragUploadTitle: 'Click or drag jar or py to this area to upload', dragUploadTip: 'Support for a single upload. You can upload a local jar here to support for current Job', dependencyError: 'please set flink version first.', diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts index bedd7fbb68..51f72ceb4b 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts @@ -62,7 +62,7 @@ export default { programJar: '程序Jar文件', dynamicProperties: '动态参数', hadoopConfigTitle: '系统 Hadoop', - dragUploadTitle: '单击或拖动 jar 到此区域以上传', + dragUploadTitle: '单击或拖动 jar或py 到此区域以上传', dragUploadTip: '支持单次上传。您可以在此处上传本地 jar 以支持当前作业', dependencyError: '请先检查flink 版本.', status: '运行状态', diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/components/UploadJobJar.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/components/UploadJobJar.vue index d84913e406..ba8f120208 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/components/UploadJobJar.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/components/UploadJobJar.vue @@ -53,7 +53,7 @@ /* Callback before file upload */ function handleBeforeUpload(file) { if (file.type !== 'application/java-archive') { - if (!/\.(jar|JAR)$/.test(file.name)) { + if (!/\.(jar|JAR|py)$/.test(file.name)) { emit('update:loading', false); createMessage.error('Only jar files can be uploaded! please check your file.'); return false; diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala index 5e627add8f..575d6eea96 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala @@ -63,6 +63,7 @@ case class SubmitRequest( savePoint: String, restoreMode: RestoreMode, args: String, + pyflinkFilePath: String = "", @Nullable buildResult: BuildResult, @Nullable k8sSubmitParam: KubernetesSubmitParam, @Nullable extraParameter: JavaMap[String, Any]) { diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml index adadd682e0..9e942fc1f8 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml @@ -64,6 +64,13 @@ + + org.apache.flink + flink-python_${scala.binary.version} + ${flink.version} + provided + + org.apache.hadoop hadoop-client-api diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/resources/pyflink.md b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/resources/pyflink.md new file mode 100644 index 0000000000..385ce5d4af --- /dev/null +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/resources/pyflink.md @@ -0,0 +1,87 @@ + +### 1. linux Creates Python virtual environments + +#### 1.1. Prepare the 'setup-pyflink-virtual-env.sh script'. The default version of 'apache-flink' is '1.16.2'. You can change the version as required.The content is as follows + +```shell +set -e +# 下载Python 3.7 miniconda.sh脚本。 +wget "https://repo.continuum.io/miniconda/Miniconda3-py37_4.9.2-Linux-x86_64.sh" -O "miniconda.sh" + +# 为Python 3.7 miniconda.sh脚本添加执行权限。 +chmod +x miniconda.sh + +# 创建Python的虚拟环境。 +./miniconda.sh -b -p venv + +# 激活Conda Python虚拟环境。 +source venv/bin/activate "" + +# 安装PyFlink依赖。 +pip install "apache-flink==1.16.2" + +# 退出Conda Python虚拟环境。 +conda deactivate + +# 删除缓存的包。 +rm -rf venv/pkgs + +# 将准备好的Conda Python虚拟环境打包。 +zip -r venv.zip venv +``` + +#### 1.2. Prepare the 'build.sh script'. The content is as follows + +The '/build' directory needs to be created by itself. You can also change it to another directory + +```shell +#!/bin/bash +set -e -x +yum install -y zip wget + +cd /root/ +bash /build/setup-pyflink-virtual-env.sh +mv venv.zip /build/ +``` + +#### 1.3. Execute the following command + +```shell +docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux2014_x86_64 ./build.sh +``` +After this command is executed, a file named venv.zip will be generated, which is the virtual environment of Python 3.7. You can also modify the above script, install another version of the Python virtual environment, or install the required third-party Python packages in the virtual environment. + +### 2. Upload venv.zip to hdfs + +venv.zip is about 539M in size and needs to be uploaded to hdfs by itself + +```shell +hadoop fs -put ./venv.zip /streampark/python +``` + +### 3. Copy venv.zip to $WORKSPACE/python + +```shell +copy ./venv.zip $WORKSPACE/python +``` + +### 4. Copy Python dependencies to $FLINK_HOME/lib + +```shell +cp -r $FLINK_HOME/opt/python $FLINK_HOME/lib + +cp $FLINK_HOME/opt/flink-python-* $FLINK_HOME/lib +``` + +### 5. If you use a flink connector dependency in your pyflink job, you need to put it in $FLINK_HOME/lib + +### 6. Reference document +```text +https://help.aliyun.com/document_detail/413966.html#:~:text=.%2Fsetup-pyflink-%20virtual%20-env.sh,%E8%AF%A5%E5%91%BD%E4%BB%A4%E6%89%A7%E8%A1%8C%E5%AE%8C%E6%88%90%E5%90%8E%EF%BC%8C%E4%BC%9A%E7%94%9F%E6%88%90%E4%B8%80%E4%B8%AA%E5%90%8D%E4%B8%BA%20venv%20%E7%9A%84%E7%9B%AE%E5%BD%95%EF%BC%8C%E5%8D%B3%E4%B8%BAPython%203.6%E7%9A%84%E8%99%9A%E6%8B%9F%E7%8E%AF%E5%A2%83%E3%80%82 + +https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/deployment/cli/#submitting-pyflink-jobs + +https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/python/python_config/ +``` + + diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala index 304735e2d3..9c83659d07 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala @@ -17,23 +17,29 @@ package org.apache.streampark.flink.client.impl -import org.apache.streampark.common.conf.Workspace +import org.apache.streampark.common.conf.{ConfigConst, Workspace} import org.apache.streampark.common.enums.DevelopmentMode +import org.apache.streampark.common.fs.FsOperator import org.apache.streampark.common.util.{HdfsUtils, Utils} import org.apache.streampark.flink.client.`trait`.YarnClientTrait import org.apache.streampark.flink.client.bean._ import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse +import org.apache.streampark.flink.util.FlinkUtils +import org.apache.commons.lang3.StringUtils import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader import org.apache.flink.client.deployment.application.ApplicationConfiguration import org.apache.flink.client.program.ClusterClient import org.apache.flink.configuration._ +import org.apache.flink.python.PythonOptions import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils} import org.apache.flink.runtime.util.HadoopUtils import org.apache.flink.yarn.configuration.YarnConfigOptions import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.records.ApplicationId +import java.io.File +import java.util import java.util.Collections import java.util.concurrent.Callable @@ -93,6 +99,43 @@ object YarnApplicationClient extends YarnClientTrait { // yarn application Type .safeSet(YarnConfigOptions.APPLICATION_TYPE, submitRequest.applicationType.getName) + if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) { + val pythonVenv: String = workspace.APP_PYTHON_VENV + if (!FsOperator.hdfs.exists(pythonVenv)) { + throw new RuntimeException(s"$pythonVenv File does not exist") + } + val pyflinkFile: File = new File(submitRequest.pyflinkFilePath) + + val argList = new util.ArrayList[String]() + argList.add("-pym") + argList.add(pyflinkFile.getName.replace(ConfigConst.PYTHON_SUFFIX, "")) + + val pythonFlinkconnectorJars: String = + FlinkUtils.getPythonFlinkconnectorJars(submitRequest.flinkVersion.flinkHome) + if (StringUtils.isNotBlank(pythonFlinkconnectorJars)) { + flinkConfig.setString(PipelineOptions.JARS.key(), pythonFlinkconnectorJars) + } + + // yarn.ship-files + flinkConfig.setString( + YarnConfigOptions.SHIP_FILES.key(), + pyflinkFile.getParentFile.getAbsolutePath) + + flinkConfig + // python.archives + .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv) + // python.client.executable + .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) + // python.executable + .safeSet(PythonOptions.PYTHON_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) + // python.files + .safeSet(PythonOptions.PYTHON_FILES, pyflinkFile.getParentFile.getName) + .safeSet( + ApplicationConfiguration.APPLICATION_MAIN_CLASS, + ConfigConst.PYTHON_DRIVER_CLASS_NAME) + .safeSet(ApplicationConfiguration.APPLICATION_ARGS, argList) + } + logInfo(s""" |------------------------------------------------------------------ |Effective submit configuration: $flinkConfig diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 6ff949bd29..3a179d8bc8 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -17,13 +17,15 @@ package org.apache.streampark.flink.client.`trait` +import org.apache.streampark.common.conf.{ConfigConst, Workspace} import org.apache.streampark.common.conf.ConfigConst._ -import org.apache.streampark.common.conf.Workspace import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, ExecutionMode, RestoreMode} -import org.apache.streampark.common.util.{DeflaterUtils, Logger} +import org.apache.streampark.common.fs.FsOperator +import org.apache.streampark.common.util.{DeflaterUtils, EnvUtils, Logger} import org.apache.streampark.flink.client.bean._ import org.apache.streampark.flink.core.FlinkClusterClient import org.apache.streampark.flink.core.conf.FlinkRunOption +import org.apache.streampark.flink.util.FlinkUtils import com.google.common.collect.Lists import org.apache.commons.cli.{CommandLine, Options} @@ -35,6 +37,7 @@ import org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines import org.apache.flink.client.deployment.application.ApplicationConfiguration import org.apache.flink.client.program.{ClusterClient, PackagedProgram, PackagedProgramUtils} import org.apache.flink.configuration._ +import org.apache.flink.python.PythonOptions import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointConfigOptions} import org.apache.flink.util.FlinkException import org.apache.flink.util.Preconditions.checkNotNull @@ -141,6 +144,17 @@ trait FlinkClientTrait extends Logger { }) } + if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) { + val flinkOptPath: String = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR) + if (StringUtils.isBlank(flinkOptPath)) { + logWarn(s"Get environment variable ${ConfigConstants.ENV_FLINK_OPT_DIR} fail") + val flinkHome = submitRequest.flinkVersion.flinkHome + EnvUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR, s"$flinkHome/opt"); + logInfo( + s"Set temporary environment variables ${ConfigConstants.ENV_FLINK_OPT_DIR} = $flinkHome/opt") + } + } + setConfig(submitRequest, flinkConfig) doSubmit(submitRequest, flinkConfig) @@ -223,16 +237,43 @@ trait FlinkClientTrait extends Logger { flinkConfig: Configuration, submitRequest: SubmitRequest, jarFile: File): (PackagedProgram, JobGraph) = { - val packageProgram = PackagedProgram.newBuilder - .setJarFile(jarFile) - .setEntryPointClassName( - flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()) - .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings) - .setArguments( - flinkConfig - .getOptional(ApplicationConfiguration.APPLICATION_ARGS) - .orElse(Lists.newArrayList()): _*) - .build() + var packageProgram: PackagedProgram = null + if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) { + val pythonVenv: String = Workspace.local.APP_PYTHON_VENV + if (!FsOperator.lfs.exists(pythonVenv)) { + throw new RuntimeException(s"$pythonVenv File does not exist") + } + flinkConfig + // python.archives + .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv) + // python.client.executable + .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) + // python.executable + .safeSet(PythonOptions.PYTHON_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) + + val pythonFlinkconnectorJars: String = + FlinkUtils.getPythonFlinkconnectorJars(submitRequest.flinkVersion.flinkHome) + if (StringUtils.isNotBlank(pythonFlinkconnectorJars)) { + flinkConfig.setString(PipelineOptions.JARS.key(), pythonFlinkconnectorJars) + } + + packageProgram = PackagedProgram.newBuilder + .setEntryPointClassName(ConfigConst.PYTHON_DRIVER_CLASS_NAME) + .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings) + .setArguments("-py", submitRequest.pyflinkFilePath) + .build() + } else { + packageProgram = PackagedProgram.newBuilder + .setJarFile(jarFile) + .setEntryPointClassName( + flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()) + .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings) + .setArguments( + flinkConfig + .getOptional(ApplicationConfiguration.APPLICATION_ARGS) + .orElse(Lists.newArrayList()): _*) + .build() + } val jobGraph = PackagedProgramUtils.createJobGraph( packageProgram, diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala index 58d38d3117..11364f02b7 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala @@ -48,6 +48,22 @@ object FlinkUtils { } } + /** + * Return a sample value: + * + * file:///flink-1.16.2/lib/flink-connector-jdbc-3.1.0-1.16.jar;file:///flink-1.16.2/lib/flink-sql-connector-mysql-cdc-2.4.0.jar + * @param flinkHome + * @return + * flink-connector-xxx.jar and flink-sql-connector-xxx.jar + */ + def getPythonFlinkconnectorJars(flinkHome: String): String = { + new File(s"$flinkHome/lib").list().filter(_.matches("flink.*connector.*\\.jar")) match { + case array if array.length > 0 => + array.map(jar => s"file://$flinkHome/lib/$jar").mkString(";") + case _ => "" + } + } + def isCheckpointEnabled(map: util.Map[String, String]): Boolean = { val checkpointInterval: Duration = TimeUtils.parseDuration( map.getOrDefault(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key, "0ms"))