diff --git a/dinky-admin/src/main/resources/dinky-loader/FlinkConfClass b/dinky-admin/src/main/resources/dinky-loader/FlinkConfClass index f613e94441..2c281af895 100644 --- a/dinky-admin/src/main/resources/dinky-loader/FlinkConfClass +++ b/dinky-admin/src/main/resources/dinky-loader/FlinkConfClass @@ -25,6 +25,7 @@ org.apache.flink.configuration.HistoryServerOptions org.apache.flink.configuration.MetricOptions org.apache.flink.configuration.NettyShuffleEnvironmentOptions org.apache.flink.configuration.RestartStrategyOptions +org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions org.apache.flink.yarn.configuration.YarnConfigOptions org.apache.flink.kubernetes.configuration.KubernetesConfigOptions -org.dinky.constant.CustomerConfigureOptions \ No newline at end of file +org.dinky.constant.CustomerConfigureOptions diff --git a/dinky-core/src/main/java/org/dinky/job/JobConfig.java b/dinky-core/src/main/java/org/dinky/job/JobConfig.java index 3784a2537a..96a8452c32 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobConfig.java +++ b/dinky-core/src/main/java/org/dinky/job/JobConfig.java @@ -28,6 +28,7 @@ import org.dinky.gateway.enums.SavePointStrategy; import org.dinky.gateway.model.FlinkClusterConfig; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.RestOptions; @@ -237,6 +238,13 @@ public void addGatewayConfig(Map config) { } } + public void addGatewayConfig(Configuration config) { + if (Asserts.isNull(gatewayConfig)) { + gatewayConfig = new GatewayConfig(); + } + gatewayConfig.getFlinkConfig().getConfiguration().putAll(config.toMap()); + } + public boolean isUseRemote() { return useRemote || !GatewayType.LOCAL.equalsValue(type); } diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java index dcc2c9e25a..d23e92577e 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java @@ -183,8 +183,8 @@ private GatewayResult submitByGateway(List inserts) { GatewayResult gatewayResult = null; - // Use gateway need to build gateway config, include flink configeration. - config.addGatewayConfig(executor.getSetConfig()); + // Use gateway need to build gateway config, include flink configuration. + config.addGatewayConfig(executor.getCustomTableEnvironment().getConfig().getConfiguration()); if (runMode.isApplicationMode()) { // Application mode need to submit dinky-app.jar that in the hdfs or image. diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/config/FlinkConfig.java b/dinky-gateway/src/main/java/org/dinky/gateway/config/FlinkConfig.java index 16da3d63ec..a6ad34c37f 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/config/FlinkConfig.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/config/FlinkConfig.java @@ -107,8 +107,6 @@ public class FlinkConfig { private static final ObjectMapper mapper = new ObjectMapper(); - public static final String DEFAULT_SAVEPOINT_PREFIX = "hdfs:///flink/savepoints/"; - public FlinkConfig() {} public FlinkConfig(Map configuration) {