Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Yarn supports pyflink #2956

Merged
merged 3 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ object ConfigConst {

val PARAM_PREFIX = "--"

/** pyflink */

val PYTHON_SUFFIX = ".py"

val PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver"

val PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3"

/** about parameter... */

val KEY_APP_HOME = "app.home"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ case class Workspace(storageType: StorageType) {

lazy val APP_UPLOADS = s"$WORKSPACE/uploads"

lazy val APP_PYTHON_VENV = s"$WORKSPACE/python/venv.zip"

lazy val APP_WORKSPACE = s"$WORKSPACE/workspace"

lazy val APP_FLINK = s"$WORKSPACE/flink"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.common.util;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Map;


public class EnvUtils {
public static void setEnv(String name, String value) throws Exception {
getModifiableEnvironment().put(name, value);
}

@SuppressWarnings("unchecked")
private static Map<String, String> getModifiableEnvironment() throws Exception {
Class<?> pe = Class.forName("java.lang.ProcessEnvironment");
Method getenv = pe.getDeclaredMethod("getenv");
getenv.setAccessible(true);
Object unmodifiableEnvironment = getenv.invoke(null);
Class<?> map = Class.forName("java.util.Collections$UnmodifiableMap");
Field m = map.getDeclaredField("m");
m.setAccessible(true);
return (Map<String, String>) m.get(unmodifiableEnvironment);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@
<outputDirectory>logs</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>${project.build.directory}/../src/main/assembly/python</directory>
<outputDirectory>python</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>${project.build.directory}/../src/main/assembly/temp</directory>
<outputDirectory>temp</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,14 @@ public void start(Application appParam, boolean auto) throws Exception {
String applicationArgs =
variableService.replaceVariable(application.getTeamId(), application.getArgs());

String pyflinkFilePath = "";
Resource resource =
resourceService.findByResourceName(application.getTeamId(), application.getJar());
if (resource != null
&& StringUtils.isNotBlank(resource.getFilePath())
&& resource.getFilePath().endsWith(ConfigConst.PYTHON_SUFFIX())) {
pyflinkFilePath = resource.getFilePath();
}
SubmitRequest submitRequest =
new SubmitRequest(
flinkEnv.getFlinkVersion(),
Expand All @@ -1574,6 +1582,7 @@ public void start(Application appParam, boolean auto) throws Exception {
getSavePointed(appParam),
appParam.getRestoreMode() == null ? null : RestoreMode.of(appParam.getRestoreMode()),
applicationArgs,
pyflinkFilePath,
buildResult,
kubernetesSubmitParam,
extraParameter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.streampark.console.core.service.impl;

import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.Utils;
Expand Down Expand Up @@ -287,6 +288,9 @@ public RestResponse checkResource(Resource resourceParam) throws JsonProcessingE
resp.put("exception", Utils.stringifyException(e));
return RestResponse.success().data(resp);
}
if (jarFile.getName().endsWith(ConfigConst.PYTHON_SUFFIX())) {
return RestResponse.success().data(resp);
}
Manifest manifest = Utils.getJarManifest(jarFile);
String mainClass = manifest.getMainAttributes().getValue("Main-Class");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export default {
programJar: 'Program Jar',
dynamicProperties: 'Dynamic Properties',
hadoopConfigTitle: 'System Hadoop Configuration',
dragUploadTitle: 'Click or drag jar to this area to upload',
dragUploadTitle: 'Click or drag jar or py to this area to upload',
dragUploadTip:
'Support for a single upload. You can upload a local jar here to support for current Job',
dependencyError: 'please set flink version first.',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export default {
programJar: '程序Jar文件',
dynamicProperties: '动态参数',
hadoopConfigTitle: '系统 Hadoop',
dragUploadTitle: '单击或拖动 jar 到此区域以上传',
dragUploadTitle: '单击或拖动 jar或py 到此区域以上传',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is best to modify the English internationalization file synchronously (lang/en/flink/app.ts)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thank you for reviewing the code

dragUploadTip: '支持单次上传。您可以在此处上传本地 jar 以支持当前作业',
dependencyError: '请先检查flink 版本.',
status: '运行状态',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
/* Callback before file upload */
function handleBeforeUpload(file) {
if (file.type !== 'application/java-archive') {
if (!/\.(jar|JAR)$/.test(file.name)) {
if (!/\.(jar|JAR|py)$/.test(file.name)) {
emit('update:loading', false);
createMessage.error('Only jar files can be uploaded! please check your file.');
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ case class SubmitRequest(
savePoint: String,
restoreMode: RestoreMode,
args: String,
pyflinkFilePath: String = "",
@Nullable buildResult: BuildResult,
@Nullable k8sSubmitParam: KubernetesSubmitParam,
@Nullable extraParameter: JavaMap[String, Any]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@

### 1. linux Creates Python virtual environments

#### 1.1. Prepare the 'setup-pyflink-virtual-env.sh script'. The default version of 'apache-flink' is '1.16.2'. You can change the version as required.The content is as follows

```shell
set -e
# 下载Python 3.7 miniconda.sh脚本。
wget "https://repo.continuum.io/miniconda/Miniconda3-py37_4.9.2-Linux-x86_64.sh" -O "miniconda.sh"

# 为Python 3.7 miniconda.sh脚本添加执行权限。
chmod +x miniconda.sh

# 创建Python的虚拟环境。
./miniconda.sh -b -p venv

# 激活Conda Python虚拟环境。
source venv/bin/activate ""

# 安装PyFlink依赖。
pip install "apache-flink==1.16.2"

# 退出Conda Python虚拟环境。
conda deactivate

# 删除缓存的包。
rm -rf venv/pkgs

# 将准备好的Conda Python虚拟环境打包。
zip -r venv.zip venv
```

#### 1.2. Prepare the 'build.sh script'. The content is as follows

The '/build' directory needs to be created by itself. You can also change it to another directory

```shell
#!/bin/bash
set -e -x
yum install -y zip wget

cd /root/
bash /build/setup-pyflink-virtual-env.sh
mv venv.zip /build/
```

#### 1.3. Execute the following command

```shell
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux2014_x86_64 ./build.sh
```
After this command is executed, a file named venv.zip will be generated, which is the virtual environment of Python 3.7. You can also modify the above script, install another version of the Python virtual environment, or install the required third-party Python packages in the virtual environment.

### 2. Upload venv.zip to hdfs

venv.zip is about 539M in size and needs to be uploaded to hdfs by itself

```shell
hadoop fs -put ./venv.zip /streampark/python
```

### 3. Copy venv.zip to $WORKSPACE/python

```shell
copy ./venv.zip $WORKSPACE/python
```

### 4. Copy Python dependencies to $FLINK_HOME/lib

```shell
cp -r $FLINK_HOME/opt/python $FLINK_HOME/lib

cp $FLINK_HOME/opt/flink-python-* $FLINK_HOME/lib
```

### 5. If you use a flink connector dependency in your pyflink job, you need to put it in $FLINK_HOME/lib

### 6. Reference document
```text
https://help.aliyun.com/document_detail/413966.html#:~:text=.%2Fsetup-pyflink-%20virtual%20-env.sh,%E8%AF%A5%E5%91%BD%E4%BB%A4%E6%89%A7%E8%A1%8C%E5%AE%8C%E6%88%90%E5%90%8E%EF%BC%8C%E4%BC%9A%E7%94%9F%E6%88%90%E4%B8%80%E4%B8%AA%E5%90%8D%E4%B8%BA%20venv%20%E7%9A%84%E7%9B%AE%E5%BD%95%EF%BC%8C%E5%8D%B3%E4%B8%BAPython%203.6%E7%9A%84%E8%99%9A%E6%8B%9F%E7%8E%AF%E5%A2%83%E3%80%82

https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/deployment/cli/#submitting-pyflink-jobs

https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/python/python_config/
```


Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,29 @@

package org.apache.streampark.flink.client.impl

import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.conf.{ConfigConst, Workspace}
import org.apache.streampark.common.enums.DevelopmentMode
import org.apache.streampark.common.fs.FsOperator
import org.apache.streampark.common.util.{HdfsUtils, Utils}
import org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
import org.apache.streampark.flink.util.FlinkUtils

import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration._
import org.apache.flink.python.PythonOptions
import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
import org.apache.flink.runtime.util.HadoopUtils
import org.apache.flink.yarn.configuration.YarnConfigOptions
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.ApplicationId

import java.io.File
import java.util
import java.util.Collections
import java.util.concurrent.Callable

Expand Down Expand Up @@ -93,6 +99,43 @@ object YarnApplicationClient extends YarnClientTrait {
// yarn application Type
.safeSet(YarnConfigOptions.APPLICATION_TYPE, submitRequest.applicationType.getName)

if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) {
val pythonVenv: String = workspace.APP_PYTHON_VENV
if (!FsOperator.hdfs.exists(pythonVenv)) {
throw new RuntimeException(s"$pythonVenv File does not exist")
}
val pyflinkFile: File = new File(submitRequest.pyflinkFilePath)

val argList = new util.ArrayList[String]()
argList.add("-pym")
argList.add(pyflinkFile.getName.replace(ConfigConst.PYTHON_SUFFIX, ""))

val pythonFlinkconnectorJars: String =
FlinkUtils.getPythonFlinkconnectorJars(submitRequest.flinkVersion.flinkHome)
if (StringUtils.isNotBlank(pythonFlinkconnectorJars)) {
flinkConfig.setString(PipelineOptions.JARS.key(), pythonFlinkconnectorJars)
}

// yarn.ship-files
flinkConfig.setString(
YarnConfigOptions.SHIP_FILES.key(),
pyflinkFile.getParentFile.getAbsolutePath)

flinkConfig
// python.archives
.safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
// python.client.executable
.safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE)
// python.executable
.safeSet(PythonOptions.PYTHON_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE)
// python.files
.safeSet(PythonOptions.PYTHON_FILES, pyflinkFile.getParentFile.getName)
.safeSet(
ApplicationConfiguration.APPLICATION_MAIN_CLASS,
ConfigConst.PYTHON_DRIVER_CLASS_NAME)
.safeSet(ApplicationConfiguration.APPLICATION_ARGS, argList)
}

logInfo(s"""
|------------------------------------------------------------------
|Effective submit configuration: $flinkConfig
Expand Down
Loading
Loading