Skip to content

Commit

Permalink
[Feature] Yarn supports pyflink (apache#2956)
Browse files Browse the repository at this point in the history
* yarn application supports pyflink

* yarn perjob supports pyflink

* pyflink supports flink connector
  • Loading branch information
ChengJie1053 authored and saLeox committed Sep 4, 2023
1 parent 2001a88 commit 09ae6b9
Show file tree
Hide file tree
Showing 16 changed files with 280 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ object ConfigConst {

val PARAM_PREFIX = "--"

/** pyflink */

val PYTHON_SUFFIX = ".py"

val PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver"

val PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3"

/** about parameter... */

val KEY_APP_HOME = "app.home"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ case class Workspace(storageType: StorageType) {

lazy val APP_UPLOADS = s"$WORKSPACE/uploads"

lazy val APP_PYTHON_VENV = s"$WORKSPACE/python/venv.zip"

lazy val APP_WORKSPACE = s"$WORKSPACE/workspace"

lazy val APP_FLINK = s"$WORKSPACE/flink"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.common.util;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Map;


public class EnvUtils {
public static void setEnv(String name, String value) throws Exception {
getModifiableEnvironment().put(name, value);
}

@SuppressWarnings("unchecked")
private static Map<String, String> getModifiableEnvironment() throws Exception {
Class<?> pe = Class.forName("java.lang.ProcessEnvironment");
Method getenv = pe.getDeclaredMethod("getenv");
getenv.setAccessible(true);
Object unmodifiableEnvironment = getenv.invoke(null);
Class<?> map = Class.forName("java.util.Collections$UnmodifiableMap");
Field m = map.getDeclaredField("m");
m.setAccessible(true);
return (Map<String, String>) m.get(unmodifiableEnvironment);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@
<outputDirectory>logs</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>${project.build.directory}/../src/main/assembly/python</directory>
<outputDirectory>python</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>${project.build.directory}/../src/main/assembly/temp</directory>
<outputDirectory>temp</outputDirectory>
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,14 @@ public void start(Application appParam, boolean auto) throws Exception {
String applicationArgs =
variableService.replaceVariable(application.getTeamId(), application.getArgs());

String pyflinkFilePath = "";
Resource resource =
resourceService.findByResourceName(application.getTeamId(), application.getJar());
if (resource != null
&& StringUtils.isNotBlank(resource.getFilePath())
&& resource.getFilePath().endsWith(ConfigConst.PYTHON_SUFFIX())) {
pyflinkFilePath = resource.getFilePath();
}
SubmitRequest submitRequest =
new SubmitRequest(
flinkEnv.getFlinkVersion(),
Expand All @@ -1574,6 +1582,7 @@ public void start(Application appParam, boolean auto) throws Exception {
getSavePointed(appParam),
appParam.getRestoreMode() == null ? null : RestoreMode.of(appParam.getRestoreMode()),
applicationArgs,
pyflinkFilePath,
buildResult,
kubernetesSubmitParam,
extraParameter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.streampark.console.core.service.impl;

import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.Utils;
Expand Down Expand Up @@ -286,6 +287,9 @@ public RestResponse checkResource(Resource resourceParam) throws JsonProcessingE
resp.put("exception", Utils.stringifyException(e));
return RestResponse.success().data(resp);
}
if (jarFile.getName().endsWith(ConfigConst.PYTHON_SUFFIX())) {
return RestResponse.success().data(resp);
}
Manifest manifest = Utils.getJarManifest(jarFile);
String mainClass = manifest.getMainAttributes().getValue("Main-Class");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export default {
programJar: 'Program Jar',
dynamicProperties: 'Dynamic Properties',
hadoopConfigTitle: 'System Hadoop Configuration',
dragUploadTitle: 'Click or drag jar to this area to upload',
dragUploadTitle: 'Click or drag jar or py to this area to upload',
dragUploadTip:
'Support for a single upload. You can upload a local jar here to support for current Job',
dependencyError: 'please set flink version first.',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export default {
programJar: '程序Jar文件',
dynamicProperties: '动态参数',
hadoopConfigTitle: '系统 Hadoop',
dragUploadTitle: '单击或拖动 jar 到此区域以上传',
dragUploadTitle: '单击或拖动 jar或py 到此区域以上传',
dragUploadTip: '支持单次上传。您可以在此处上传本地 jar 以支持当前作业',
dependencyError: '请先检查flink 版本.',
status: '运行状态',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
/* Callback before file upload */
function handleBeforeUpload(file) {
if (file.type !== 'application/java-archive') {
if (!/\.(jar|JAR)$/.test(file.name)) {
if (!/\.(jar|JAR|py)$/.test(file.name)) {
emit('update:loading', false);
createMessage.error('Only jar files can be uploaded! please check your file.');
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ case class SubmitRequest(
savePoint: String,
restoreMode: RestoreMode,
args: String,
pyflinkFilePath: String = "",
@Nullable buildResult: BuildResult,
@Nullable k8sSubmitParam: KubernetesSubmitParam,
@Nullable extraParameter: JavaMap[String, Any]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@

### 1. linux Creates Python virtual environments

#### 1.1. Prepare the 'setup-pyflink-virtual-env.sh script'. The default version of 'apache-flink' is '1.16.2'. You can change the version as required.The content is as follows

```shell
set -e
# 下载Python 3.7 miniconda.sh脚本。
wget "https://repo.continuum.io/miniconda/Miniconda3-py37_4.9.2-Linux-x86_64.sh" -O "miniconda.sh"

# 为Python 3.7 miniconda.sh脚本添加执行权限。
chmod +x miniconda.sh

# 创建Python的虚拟环境。
./miniconda.sh -b -p venv

# 激活Conda Python虚拟环境。
source venv/bin/activate ""

# 安装PyFlink依赖。
pip install "apache-flink==1.16.2"

# 退出Conda Python虚拟环境。
conda deactivate

# 删除缓存的包。
rm -rf venv/pkgs

# 将准备好的Conda Python虚拟环境打包。
zip -r venv.zip venv
```

#### 1.2. Prepare the 'build.sh script'. The content is as follows

The '/build' directory needs to be created by itself. You can also change it to another directory

```shell
#!/bin/bash
set -e -x
yum install -y zip wget

cd /root/
bash /build/setup-pyflink-virtual-env.sh
mv venv.zip /build/
```

#### 1.3. Execute the following command

```shell
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux2014_x86_64 ./build.sh
```
After this command is executed, a file named venv.zip will be generated, which is the virtual environment of Python 3.7. You can also modify the above script, install another version of the Python virtual environment, or install the required third-party Python packages in the virtual environment.

### 2. Upload venv.zip to hdfs

venv.zip is about 539M in size and needs to be uploaded to hdfs by itself

```shell
hadoop fs -put ./venv.zip /streampark/python
```

### 3. Copy venv.zip to $WORKSPACE/python

```shell
copy ./venv.zip $WORKSPACE/python
```

### 4. Copy Python dependencies to $FLINK_HOME/lib

```shell
cp -r $FLINK_HOME/opt/python $FLINK_HOME/lib

cp $FLINK_HOME/opt/flink-python-* $FLINK_HOME/lib
```

### 5. If you use a flink connector dependency in your pyflink job, you need to put it in $FLINK_HOME/lib

### 6. Reference document
```text
https://help.aliyun.com/document_detail/413966.html#:~:text=.%2Fsetup-pyflink-%20virtual%20-env.sh,%E8%AF%A5%E5%91%BD%E4%BB%A4%E6%89%A7%E8%A1%8C%E5%AE%8C%E6%88%90%E5%90%8E%EF%BC%8C%E4%BC%9A%E7%94%9F%E6%88%90%E4%B8%80%E4%B8%AA%E5%90%8D%E4%B8%BA%20venv%20%E7%9A%84%E7%9B%AE%E5%BD%95%EF%BC%8C%E5%8D%B3%E4%B8%BAPython%203.6%E7%9A%84%E8%99%9A%E6%8B%9F%E7%8E%AF%E5%A2%83%E3%80%82
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/deployment/cli/#submitting-pyflink-jobs
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/python/python_config/
```


Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,29 @@

package org.apache.streampark.flink.client.impl

import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.conf.{ConfigConst, Workspace}
import org.apache.streampark.common.enums.DevelopmentMode
import org.apache.streampark.common.fs.FsOperator
import org.apache.streampark.common.util.{HdfsUtils, Utils}
import org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
import org.apache.streampark.flink.util.FlinkUtils

import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration._
import org.apache.flink.python.PythonOptions
import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
import org.apache.flink.runtime.util.HadoopUtils
import org.apache.flink.yarn.configuration.YarnConfigOptions
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.ApplicationId

import java.io.File
import java.util
import java.util.Collections
import java.util.concurrent.Callable

Expand Down Expand Up @@ -93,6 +99,43 @@ object YarnApplicationClient extends YarnClientTrait {
// yarn application Type
.safeSet(YarnConfigOptions.APPLICATION_TYPE, submitRequest.applicationType.getName)

if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) {
val pythonVenv: String = workspace.APP_PYTHON_VENV
if (!FsOperator.hdfs.exists(pythonVenv)) {
throw new RuntimeException(s"$pythonVenv File does not exist")
}
val pyflinkFile: File = new File(submitRequest.pyflinkFilePath)

val argList = new util.ArrayList[String]()
argList.add("-pym")
argList.add(pyflinkFile.getName.replace(ConfigConst.PYTHON_SUFFIX, ""))

val pythonFlinkconnectorJars: String =
FlinkUtils.getPythonFlinkconnectorJars(submitRequest.flinkVersion.flinkHome)
if (StringUtils.isNotBlank(pythonFlinkconnectorJars)) {
flinkConfig.setString(PipelineOptions.JARS.key(), pythonFlinkconnectorJars)
}

// yarn.ship-files
flinkConfig.setString(
YarnConfigOptions.SHIP_FILES.key(),
pyflinkFile.getParentFile.getAbsolutePath)

flinkConfig
// python.archives
.safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
// python.client.executable
.safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE)
// python.executable
.safeSet(PythonOptions.PYTHON_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE)
// python.files
.safeSet(PythonOptions.PYTHON_FILES, pyflinkFile.getParentFile.getName)
.safeSet(
ApplicationConfiguration.APPLICATION_MAIN_CLASS,
ConfigConst.PYTHON_DRIVER_CLASS_NAME)
.safeSet(ApplicationConfiguration.APPLICATION_ARGS, argList)
}

logInfo(s"""
|------------------------------------------------------------------
|Effective submit configuration: $flinkConfig
Expand Down
Loading

0 comments on commit 09ae6b9

Please sign in to comment.