diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala index 2d6b74fbad..6c5c6a6d5f 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala @@ -26,6 +26,14 @@ object ConfigConst { val PARAM_PREFIX = "--" + /** pyflink */ + + val PYTHON_SUFFIX = ".py" + + val PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver" + + val PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3" + /** about parameter... */ val KEY_APP_HOME = "app.home" diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala index 03b0c96824..d914780a1a 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala @@ -102,6 +102,8 @@ case class Workspace(storageType: StorageType) { lazy val APP_UPLOADS = s"$WORKSPACE/uploads" + lazy val APP_PYTHON_VENV = s"$WORKSPACE/python/venv.zip" + lazy val APP_WORKSPACE = s"$WORKSPACE/workspace" lazy val APP_FLINK = s"$WORKSPACE/flink" diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java b/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java new file mode 100644 index 0000000000..a8fd54be3a --- /dev/null +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.streampark.common.util; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Map; + + +public class EnvUtils { + public static void setEnv(String name, String value) throws Exception { + getModifiableEnvironment().put(name, value); + } + + @SuppressWarnings("unchecked") + private static Map getModifiableEnvironment() throws Exception { + Class pe = Class.forName("java.lang.ProcessEnvironment"); + Method getenv = pe.getDeclaredMethod("getenv"); + getenv.setAccessible(true); + Object unmodifiableEnvironment = getenv.invoke(null); + Class map = Class.forName("java.util.Collections$UnmodifiableMap"); + Field m = map.getDeclaredField("m"); + m.setAccessible(true); + return (Map) m.get(unmodifiableEnvironment); + } +} + diff --git a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml index 92c73dac48..50f77f561f 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml +++ b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml @@ -68,6 +68,11 @@ logs 0755 + + ${project.build.directory}/../src/main/assembly/python + python + 0755 + ${project.build.directory}/../src/main/assembly/temp temp diff --git a/streampark-console/streampark-console-service/src/main/assembly/python/.gitkeep b/streampark-console/streampark-console-service/src/main/assembly/python/.gitkeep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 3dcf471ce2..01193e2781 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -1559,6 +1559,14 @@ public void start(Application appParam, boolean auto) throws Exception { String applicationArgs = variableService.replaceVariable(application.getTeamId(), application.getArgs()); + String pyflinkFilePath = ""; + Resource resource = + resourceService.findByResourceName(application.getTeamId(), application.getJar()); + if (resource != null + && StringUtils.isNotBlank(resource.getFilePath()) + && resource.getFilePath().endsWith(ConfigConst.PYTHON_SUFFIX())) { + pyflinkFilePath = resource.getFilePath(); + } SubmitRequest submitRequest = new SubmitRequest( flinkEnv.getFlinkVersion(), @@ -1574,6 +1582,7 @@ public void start(Application appParam, boolean auto) throws Exception { getSavePointed(appParam), appParam.getRestoreMode() == null ? null : RestoreMode.of(appParam.getRestoreMode()), applicationArgs, + pyflinkFilePath, buildResult, kubernetesSubmitParam, extraParameter); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java index 8b900afe51..1a66aeb248 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.streampark.console.core.service.impl; +import org.apache.streampark.common.conf.ConfigConst; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.common.util.Utils; @@ -287,6 +288,9 @@ public RestResponse checkResource(Resource resourceParam) throws JsonProcessingE resp.put("exception", Utils.stringifyException(e)); return RestResponse.success().data(resp); } + if (jarFile.getName().endsWith(ConfigConst.PYTHON_SUFFIX())) { + return RestResponse.success().data(resp); + } Manifest manifest = Utils.getJarManifest(jarFile); String mainClass = manifest.getMainAttributes().getValue("Main-Class"); diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts index d1b94b5b9a..6ad7788fb0 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts @@ -62,7 +62,7 @@ export default { programJar: '程序Jar文件', dynamicProperties: '动态参数', hadoopConfigTitle: '系统 Hadoop', - dragUploadTitle: '单击或拖动 jar 到此区域以上传', + dragUploadTitle: '单击或拖动 jar或py 到此区域以上传', dragUploadTip: '支持单次上传。您可以在此处上传本地 jar 以支持当前作业', dependencyError: '请先检查flink 版本.', status: '运行状态', diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/components/UploadJobJar.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/components/UploadJobJar.vue index d84913e406..ba8f120208 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/components/UploadJobJar.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/components/UploadJobJar.vue @@ -53,7 +53,7 @@ /* Callback before file upload */ function handleBeforeUpload(file) { if (file.type !== 'application/java-archive') { - if (!/\.(jar|JAR)$/.test(file.name)) { + if (!/\.(jar|JAR|py)$/.test(file.name)) { emit('update:loading', false); createMessage.error('Only jar files can be uploaded! please check your file.'); return false; diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala index 5e627add8f..575d6eea96 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala @@ -63,6 +63,7 @@ case class SubmitRequest( savePoint: String, restoreMode: RestoreMode, args: String, + pyflinkFilePath: String = "", @Nullable buildResult: BuildResult, @Nullable k8sSubmitParam: KubernetesSubmitParam, @Nullable extraParameter: JavaMap[String, Any]) { diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml index adadd682e0..9e942fc1f8 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml @@ -64,6 +64,13 @@ + + org.apache.flink + flink-python_${scala.binary.version} + ${flink.version} + provided + + org.apache.hadoop hadoop-client-api diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/resources/pyflink.md b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/resources/pyflink.md new file mode 100644 index 0000000000..70dd1df004 --- /dev/null +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/resources/pyflink.md @@ -0,0 +1,79 @@ + +### 1. linux Creates Python virtual environments + +#### 1.1. Prepare the 'setup-pyflink-virtual-env.sh script'. The default version of 'apache-flink' is '1.16.2'. You can change the version as required.The content is as follows + +```shell +set -e +# 下载Python 3.7 miniconda.sh脚本。 +wget "https://repo.continuum.io/miniconda/Miniconda3-py37_4.9.2-Linux-x86_64.sh" -O "miniconda.sh" + +# 为Python 3.7 miniconda.sh脚本添加执行权限。 +chmod +x miniconda.sh + +# 创建Python的虚拟环境。 +./miniconda.sh -b -p venv + +# 激活Conda Python虚拟环境。 +source venv/bin/activate "" + +# 安装PyFlink依赖。 +pip install "apache-flink==1.16.2" + +# 退出Conda Python虚拟环境。 +conda deactivate + +# 删除缓存的包。 +rm -rf venv/pkgs + +# 将准备好的Conda Python虚拟环境打包。 +zip -r venv.zip venv +``` + +#### 1.2. Prepare the 'build.sh script'. The content is as follows + +The '/build' directory needs to be created by itself. You can also change it to another directory + +```shell +#!/bin/bash +set -e -x +yum install -y zip wget + +cd /root/ +bash /build/setup-pyflink-virtual-env.sh +mv venv.zip /build/ +``` + +#### 1.3. Execute the following command + +```shell +docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux2014_x86_64 ./build.sh +``` +After this command is executed, a file named venv.zip will be generated, which is the virtual environment of Python 3.7. You can also modify the above script, install another version of the Python virtual environment, or install the required third-party Python packages in the virtual environment. + +### 2. Upload venv.zip to hdfs + +venv.zip is about 539M in size and needs to be uploaded to hdfs by itself + +```shell +hadoop fs -put ./venv.zip /streampark/python +``` + +### 3. Copy Python dependencies to $FLINK_HOME/lib + +```shell +cp -r $FLINK_HOME/opt/python $FLINK_HOME/lib + +cp $FLINK_HOME/opt/flink-python-* $FLINK_HOME/lib +``` + +### 4. Reference document +```text +https://help.aliyun.com/document_detail/413966.html#:~:text=.%2Fsetup-pyflink-%20virtual%20-env.sh,%E8%AF%A5%E5%91%BD%E4%BB%A4%E6%89%A7%E8%A1%8C%E5%AE%8C%E6%88%90%E5%90%8E%EF%BC%8C%E4%BC%9A%E7%94%9F%E6%88%90%E4%B8%80%E4%B8%AA%E5%90%8D%E4%B8%BA%20venv%20%E7%9A%84%E7%9B%AE%E5%BD%95%EF%BC%8C%E5%8D%B3%E4%B8%BAPython%203.6%E7%9A%84%E8%99%9A%E6%8B%9F%E7%8E%AF%E5%A2%83%E3%80%82 + +https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/deployment/cli/#submitting-pyflink-jobs + +https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/python/python_config/ +``` + + diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala index 2b981b3054..8834d339b5 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala @@ -17,23 +17,28 @@ package org.apache.streampark.flink.client.impl -import org.apache.streampark.common.conf.Workspace +import org.apache.streampark.common.conf.{ConfigConst, Workspace} import org.apache.streampark.common.enums.DevelopmentMode +import org.apache.streampark.common.fs.FsOperator import org.apache.streampark.common.util.{HdfsUtils, Utils} import org.apache.streampark.flink.client.`trait`.YarnClientTrait import org.apache.streampark.flink.client.bean._ import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse +import org.apache.commons.lang3.StringUtils import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader import org.apache.flink.client.deployment.application.ApplicationConfiguration import org.apache.flink.client.program.ClusterClient import org.apache.flink.configuration._ +import org.apache.flink.python.PythonOptions import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils} import org.apache.flink.runtime.util.HadoopUtils import org.apache.flink.yarn.configuration.YarnConfigOptions import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.records.ApplicationId +import java.io.File +import java.util import java.util.Collections import java.util.concurrent.Callable @@ -93,6 +98,37 @@ object YarnApplicationClient extends YarnClientTrait { // yarn application Type .safeSet(YarnConfigOptions.APPLICATION_TYPE, submitRequest.applicationType.getName) + if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) { + val pythonVenv: String = workspace.APP_PYTHON_VENV + if (!FsOperator.hdfs.exists(pythonVenv)) { + throw new RuntimeException(s"$pythonVenv File does not exist") + } + val pyflinkFile: File = new File(submitRequest.pyflinkFilePath) + + val argList = new util.ArrayList[String]() + argList.add("-pym") + argList.add(pyflinkFile.getName.replace(ConfigConst.PYTHON_SUFFIX, "")) + + // yarn.ship-files + flinkConfig.setString( + YarnConfigOptions.SHIP_FILES.key(), + pyflinkFile.getParentFile.getAbsolutePath) + + flinkConfig + // python.archives + .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv) + // python.client.executable + .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) + // python.executable + .safeSet(PythonOptions.PYTHON_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) + // python.files + .safeSet(PythonOptions.PYTHON_FILES, pyflinkFile.getParentFile.getName) + .safeSet( + ApplicationConfiguration.APPLICATION_MAIN_CLASS, + ConfigConst.PYTHON_DRIVER_CLASS_NAME) + .safeSet(ApplicationConfiguration.APPLICATION_ARGS, argList) + } + logInfo(s""" |------------------------------------------------------------------ |Effective submit configuration: $flinkConfig diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 5c65da5bae..153aba6c89 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -20,7 +20,7 @@ package org.apache.streampark.flink.client.`trait` import org.apache.streampark.common.conf.ConfigConst._ import org.apache.streampark.common.conf.Workspace import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, ExecutionMode, RestoreMode} -import org.apache.streampark.common.util.{DeflaterUtils, Logger} +import org.apache.streampark.common.util.{DeflaterUtils, EnvUtils, Logger} import org.apache.streampark.flink.client.bean._ import org.apache.streampark.flink.core.FlinkClusterClient import org.apache.streampark.flink.core.conf.FlinkRunOption @@ -141,6 +141,17 @@ trait FlinkClientTrait extends Logger { }) } + if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) { + val flinkOptPath: String = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR) + if (StringUtils.isBlank(flinkOptPath)) { + logWarn(s"Get environment variable ${ConfigConstants.ENV_FLINK_OPT_DIR} fail") + val flinkHome = submitRequest.flinkVersion.flinkHome + EnvUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR, s"$flinkHome/opt"); + logInfo( + s"Set temporary environment variables ${ConfigConstants.ENV_FLINK_OPT_DIR} = $flinkHome/opt") + } + } + setConfig(submitRequest, flinkConfig) doSubmit(submitRequest, flinkConfig)