Skip to content

Commit

Permalink
[Fix-2890] Fix 'set xxx = xxx' in flinksql task has no effect (DataLi…
Browse files Browse the repository at this point in the history
…nkDC#2932)

Co-authored-by: wenmo <[email protected]>
  • Loading branch information
aiwenmo and aiwenmo authored Jan 6, 2024
1 parent 944b88d commit 1029281
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 5 deletions.
3 changes: 2 additions & 1 deletion dinky-admin/src/main/resources/dinky-loader/FlinkConfClass
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ org.apache.flink.configuration.HistoryServerOptions
org.apache.flink.configuration.MetricOptions
org.apache.flink.configuration.NettyShuffleEnvironmentOptions
org.apache.flink.configuration.RestartStrategyOptions
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
org.apache.flink.yarn.configuration.YarnConfigOptions
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
org.dinky.constant.CustomerConfigureOptions
org.dinky.constant.CustomerConfigureOptions
8 changes: 8 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.dinky.gateway.enums.SavePointStrategy;
import org.dinky.gateway.model.FlinkClusterConfig;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.RestOptions;

Expand Down Expand Up @@ -237,6 +238,13 @@ public void addGatewayConfig(Map<String, String> config) {
}
}

public void addGatewayConfig(Configuration config) {
if (Asserts.isNull(gatewayConfig)) {
gatewayConfig = new GatewayConfig();
}
gatewayConfig.getFlinkConfig().getConfiguration().putAll(config.toMap());
}

public boolean isUseRemote() {
return useRemote || !GatewayType.LOCAL.equalsValue(type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ private GatewayResult submitByGateway(List<String> inserts) {

GatewayResult gatewayResult = null;

// Use gateway need to build gateway config, include flink configeration.
config.addGatewayConfig(executor.getSetConfig());
// Use gateway need to build gateway config, include flink configuration.
config.addGatewayConfig(executor.getCustomTableEnvironment().getConfig().getConfiguration());

if (runMode.isApplicationMode()) {
// Application mode need to submit dinky-app.jar that in the hdfs or image.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ public class FlinkConfig {

private static final ObjectMapper mapper = new ObjectMapper();

public static final String DEFAULT_SAVEPOINT_PREFIX = "hdfs:///flink/savepoints/";

public FlinkConfig() {}

public FlinkConfig(Map<String, String> configuration) {
Expand Down

0 comments on commit 1029281

Please sign in to comment.