From 1029281870717f8a6dffa0bdd4c681cf81df91d1 Mon Sep 17 00:00:00 2001 From: aiwenmo <32723967+aiwenmo@users.noreply.github.com> Date: Sun, 7 Jan 2024 00:24:09 +0800 Subject: [PATCH] [Fix-2890] Fix 'set xxx = xxx' in flinksql task has no effect (#2932) Co-authored-by: wenmo <32723967+wenmo@users.noreply.github.com> --- .../src/main/resources/dinky-loader/FlinkConfClass | 3 ++- dinky-core/src/main/java/org/dinky/job/JobConfig.java | 8 ++++++++ .../main/java/org/dinky/job/builder/JobTransBuilder.java | 4 ++-- .../main/java/org/dinky/gateway/config/FlinkConfig.java | 2 -- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/dinky-admin/src/main/resources/dinky-loader/FlinkConfClass b/dinky-admin/src/main/resources/dinky-loader/FlinkConfClass index f613e94441..2c281af895 100644 --- a/dinky-admin/src/main/resources/dinky-loader/FlinkConfClass +++ b/dinky-admin/src/main/resources/dinky-loader/FlinkConfClass @@ -25,6 +25,7 @@ org.apache.flink.configuration.HistoryServerOptions org.apache.flink.configuration.MetricOptions org.apache.flink.configuration.NettyShuffleEnvironmentOptions org.apache.flink.configuration.RestartStrategyOptions +org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions org.apache.flink.yarn.configuration.YarnConfigOptions org.apache.flink.kubernetes.configuration.KubernetesConfigOptions -org.dinky.constant.CustomerConfigureOptions \ No newline at end of file +org.dinky.constant.CustomerConfigureOptions diff --git a/dinky-core/src/main/java/org/dinky/job/JobConfig.java b/dinky-core/src/main/java/org/dinky/job/JobConfig.java index 3784a2537a..96a8452c32 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobConfig.java +++ b/dinky-core/src/main/java/org/dinky/job/JobConfig.java @@ -28,6 +28,7 @@ import org.dinky.gateway.enums.SavePointStrategy; import org.dinky.gateway.model.FlinkClusterConfig; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.RestOptions; @@ -237,6 +238,13 @@ public void addGatewayConfig(Map config) { } } + public void addGatewayConfig(Configuration config) { + if (Asserts.isNull(gatewayConfig)) { + gatewayConfig = new GatewayConfig(); + } + gatewayConfig.getFlinkConfig().getConfiguration().putAll(config.toMap()); + } + public boolean isUseRemote() { return useRemote || !GatewayType.LOCAL.equalsValue(type); } diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java index dcc2c9e25a..d23e92577e 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java @@ -183,8 +183,8 @@ private GatewayResult submitByGateway(List inserts) { GatewayResult gatewayResult = null; - // Use gateway need to build gateway config, include flink configeration. - config.addGatewayConfig(executor.getSetConfig()); + // Use gateway need to build gateway config, include flink configuration. + config.addGatewayConfig(executor.getCustomTableEnvironment().getConfig().getConfiguration()); if (runMode.isApplicationMode()) { // Application mode need to submit dinky-app.jar that in the hdfs or image. diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/config/FlinkConfig.java b/dinky-gateway/src/main/java/org/dinky/gateway/config/FlinkConfig.java index 16da3d63ec..a6ad34c37f 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/config/FlinkConfig.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/config/FlinkConfig.java @@ -107,8 +107,6 @@ public class FlinkConfig { private static final ObjectMapper mapper = new ObjectMapper(); - public static final String DEFAULT_SAVEPOINT_PREFIX = "hdfs:///flink/savepoints/"; - public FlinkConfig() {} public FlinkConfig(Map configuration) {