Skip to content

Commit

Permalink
Optimize the jar task submission code and cancel unnecessary resource…
Browse files Browse the repository at this point in the history
… load (DataLinkDC#3616)

Co-authored-by: gaoyan1998 <[email protected]>
  • Loading branch information
gaoyan1998 and gaoyan1998 authored Jun 27, 2024
1 parent ec1abfe commit 9b5c592
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 71 deletions.
75 changes: 5 additions & 70 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.dinky.data.result.ErrorResult;
import org.dinky.data.result.ExplainResult;
import org.dinky.data.result.IResult;
import org.dinky.data.result.InsertResult;
import org.dinky.data.result.ResultBuilder;
import org.dinky.data.result.ResultPool;
import org.dinky.data.result.SelectResult;
Expand Down Expand Up @@ -67,19 +66,15 @@
import org.dinky.utils.SqlUtil;
import org.dinky.utils.URLUtils;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.yarn.configuration.YarnConfigOptions;

Expand All @@ -88,7 +83,6 @@
import java.lang.ref.WeakReference;
import java.net.URL;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -264,72 +258,13 @@ public JobResult executeJarSql(String statement) throws Exception {
statement = String.join(";\n", statements);
job = Job.build(runMode, config, executorConfig, executor, statement, useGateway);
ready();
JobJarStreamGraphBuilder jobJarStreamGraphBuilder = JobJarStreamGraphBuilder.build(this);
Pipeline pipeline = jobJarStreamGraphBuilder.getJarStreamGraph(statement, getDinkyClassLoader());
Configuration configuration =
executor.getCustomTableEnvironment().getConfig().getConfiguration();
if (pipeline instanceof StreamGraph) {
if (Asserts.isNotNullString(config.getSavePointPath())) {
((StreamGraph) pipeline)
.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(
config.getSavePointPath(),
configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)));
}
}
try {
if (!useGateway) {
JobClient jobClient =
FlinkStreamEnvironmentUtil.executeAsync(pipeline, executor.getStreamExecutionEnvironment());
if (Asserts.isNotNull(jobClient)) {
job.setJobId(jobClient.getJobID().toHexString());
job.setJids(new ArrayList<String>() {

{
add(job.getJobId());
}
});
job.setStatus(Job.JobStatus.SUCCESS);
success();
} else {
job.setStatus(Job.JobStatus.FAILED);
failed();
}
JobJarStreamGraphBuilder.build(this).run();
if (job.isFailed()) {
failed();
} else {
GatewayResult gatewayResult;
config.addGatewayConfig(configuration);
if (runMode.isApplicationMode()) {
config.getGatewayConfig().setSql(statement);
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar(getUdfPathContextHolder());
} else {
if (pipeline instanceof StreamGraph) {
((StreamGraph) pipeline).setJobName(config.getJobName());
} else if (pipeline instanceof Plan) {
((Plan) pipeline).setJobName(config.getJobName());
}
JobGraph jobGraph = FlinkStreamEnvironmentUtil.getJobGraph(pipeline, configuration);
GatewayConfig gatewayConfig = config.getGatewayConfig();
List<String> uriList = jobJarStreamGraphBuilder.getUris(statement);
String[] jarPaths = uriList.stream()
.map(URLUtils::toFile)
.map(File::getAbsolutePath)
.toArray(String[]::new);
gatewayConfig.setJarPaths(jarPaths);
gatewayResult = Gateway.build(gatewayConfig).submitJobGraph(jobGraph);
}
job.setResult(InsertResult.success(gatewayResult.getId()));
job.setJobId(gatewayResult.getId());
job.setJids(gatewayResult.getJids());
job.setJobManagerAddress(URLUtils.formatAddress(gatewayResult.getWebURL()));

if (gatewayResult.isSuccess()) {
job.setStatus(Job.JobStatus.SUCCESS);
success();
} else {
job.setStatus(Job.JobStatus.FAILED);
job.setError(gatewayResult.getError());
log.error(gatewayResult.getError());
failed();
}
job.setStatus(Job.JobStatus.SUCCESS);
success();
}
} catch (Exception e) {
String error =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@

package org.dinky.job.builder;

import org.dinky.assertion.Asserts;
import org.dinky.classloader.DinkyClassLoader;
import org.dinky.data.exception.DinkyException;
import org.dinky.data.result.InsertResult;
import org.dinky.gateway.Gateway;
import org.dinky.gateway.config.GatewayConfig;
import org.dinky.gateway.result.GatewayResult;
import org.dinky.job.Job;
import org.dinky.job.JobBuilder;
import org.dinky.job.JobManager;
import org.dinky.parser.SqlType;
Expand All @@ -32,9 +38,18 @@
import org.dinky.trans.parse.ExecuteJarParseStrategy;
import org.dinky.trans.parse.SetSqlParseStrategy;
import org.dinky.utils.DinkyClassLoaderUtil;
import org.dinky.utils.FlinkStreamEnvironmentUtil;
import org.dinky.utils.SqlUtil;
import org.dinky.utils.URLUtils;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.io.File;
import java.net.URL;
Expand All @@ -43,22 +58,103 @@
import java.util.Set;

import cn.hutool.core.lang.Assert;
import lombok.extern.slf4j.Slf4j;

/**
* JobJarStreamGraphBuilder
*/
@Slf4j
public class JobJarStreamGraphBuilder extends JobBuilder {

private final Configuration configuration;

public JobJarStreamGraphBuilder(JobManager jobManager) {
super(jobManager);
configuration = executor.getCustomTableEnvironment().getConfig().getConfiguration();
}

public static JobJarStreamGraphBuilder build(JobManager jobManager) {
return new JobJarStreamGraphBuilder(jobManager);
}

private Pipeline getPipeline() {
Pipeline pipeline = getJarStreamGraph(job.getStatement(), jobManager.getDinkyClassLoader());
if (pipeline instanceof StreamGraph) {
if (Asserts.isNotNullString(config.getSavePointPath())) {
((StreamGraph) pipeline)
.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(
config.getSavePointPath(),
configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)));
}
}
return pipeline;
}

@Override
public void run() throws Exception {}
public void run() throws Exception {
if (!useGateway) {
submitNormal();
} else {
GatewayResult gatewayResult;
if (runMode.isApplicationMode()) {
gatewayResult = submitGateway();
} else {
gatewayResult = submitNormalWithGateway();
}
job.setResult(InsertResult.success(gatewayResult.getId()));
job.setJobId(gatewayResult.getId());
job.setJids(gatewayResult.getJids());
job.setJobManagerAddress(URLUtils.formatAddress(gatewayResult.getWebURL()));

if (gatewayResult.isSuccess()) {
job.setStatus(Job.JobStatus.SUCCESS);
} else {
job.setStatus(Job.JobStatus.FAILED);
job.setError(gatewayResult.getError());
log.error(gatewayResult.getError());
}
}
}

private GatewayResult submitGateway() throws Exception {
config.addGatewayConfig(configuration);
config.getGatewayConfig().setSql(job.getStatement());
return Gateway.build(config.getGatewayConfig()).submitJar(jobManager.getUdfPathContextHolder());
}

private GatewayResult submitNormalWithGateway() {
Pipeline pipeline = getPipeline();
if (pipeline instanceof StreamGraph) {
((StreamGraph) pipeline).setJobName(config.getJobName());
} else if (pipeline instanceof Plan) {
((Plan) pipeline).setJobName(config.getJobName());
}
JobGraph jobGraph = FlinkStreamEnvironmentUtil.getJobGraph(pipeline, configuration);
GatewayConfig gatewayConfig = config.getGatewayConfig();
List<String> uriList = getUris(job.getStatement());
String[] jarPaths = uriList.stream()
.map(URLUtils::toFile)
.map(File::getAbsolutePath)
.toArray(String[]::new);
gatewayConfig.setJarPaths(jarPaths);
return Gateway.build(gatewayConfig).submitJobGraph(jobGraph);
}

private void submitNormal() throws Exception {
JobClient jobClient =
FlinkStreamEnvironmentUtil.executeAsync(getPipeline(), executor.getStreamExecutionEnvironment());
if (Asserts.isNotNull(jobClient)) {
job.setJobId(jobClient.getJobID().toHexString());
job.setJids(new ArrayList<String>() {
{
add(job.getJobId());
}
});
job.setStatus(Job.JobStatus.SUCCESS);
} else {
job.setStatus(Job.JobStatus.FAILED);
}
}

public Pipeline getJarStreamGraph(String statement, DinkyClassLoader dinkyClassLoader) {
DinkyClassLoaderUtil.initClassLoader(config, dinkyClassLoader);
Expand Down

0 comments on commit 9b5c592

Please sign in to comment.