diff --git a/cdap-app-templates/cdap-etl/cdap-data-streams-base/src/main/java/io/cdap/cdap/datastreams/DataStreamsSparkLauncher.java b/cdap-app-templates/cdap-etl/cdap-data-streams-base/src/main/java/io/cdap/cdap/datastreams/DataStreamsSparkLauncher.java index 865f9939c276..06aebd69682f 100644 --- a/cdap-app-templates/cdap-etl/cdap-data-streams-base/src/main/java/io/cdap/cdap/datastreams/DataStreamsSparkLauncher.java +++ b/cdap-app-templates/cdap-etl/cdap-data-streams-base/src/main/java/io/cdap/cdap/datastreams/DataStreamsSparkLauncher.java @@ -79,8 +79,7 @@ protected void configure() { @Override public void initialize() throws Exception { SparkClientContext context = getContext(); - String arguments = - Joiner.on(", ").withKeyValueSeparator("=").join(context.getRuntimeArguments()); + String arguments = Joiner.on(", ").withKeyValueSeparator("=").join(context.getRuntimeArguments()); WRAPPERLOGGER.info("Pipeline '{}' is started by user '{}' with arguments {}", context.getApplicationSpecification().getName(), UserGroupInformation.getCurrentUser().getShortUserName(), @@ -127,25 +126,18 @@ public void initialize() throws Exception { } } sparkConf.set("spark.streaming.backpressure.enabled", "true"); - sparkConf.set( - "spark.spark.streaming.blockInterval", String.valueOf(spec.getBatchIntervalMillis() / 5)); - String maxFetchSize = String.valueOf(Integer.MAX_VALUE - 512); - sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", maxFetchSize); + sparkConf.set("spark.spark.streaming.blockInterval", String.valueOf(spec.getBatchIntervalMillis() / 5)); + // NOTE: If you change this value, also update io.netty.maxDirectMemory in + // KubeMasterEnvironment + sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", String.valueOf(Integer.MAX_VALUE - 512)); // spark... makes you set this to at least the number of receivers (streaming sources) // because it holds one thread per receiver, or one core in distributed mode. // so... we have to set this hacky master variable based on the isUnitTest setting in the config String extraOpts = spec.getExtraJavaOpts(); - // In Spark v3.2.0 and later, max netty direct memory must be - // >= spark.network.maxRemoteBlockSizeFetchToMem for executors. - // So we set max netty direct memory = spark.network.maxRemoteBlockSizeFetchToMem - // See CDAP-20758 for details. - String nettyMaxDirectMemory = String.format("-Dio.netty.maxDirectMemory=%s", maxFetchSize); if (extraOpts != null && !extraOpts.isEmpty()) { sparkConf.set("spark.driver.extraJavaOptions", extraOpts); - sparkConf.set("spark.executor.extraJavaOptions", nettyMaxDirectMemory + " " + extraOpts); - } else { - sparkConf.set("spark.executor.extraJavaOptions", nettyMaxDirectMemory); + sparkConf.set("spark.executor.extraJavaOptions", extraOpts); } // without this, stopping will hang on machines with few cores. sparkConf.set("spark.rpc.netty.dispatcher.numThreads", String.valueOf(numSources + 2)); @@ -165,11 +157,8 @@ public void initialize() throws Exception { try { int numExecutors = Integer.parseInt(property.getValue()); if (numExecutors < minExecutors) { - LOG.warn( - "Number of executors {} is less than the minimum number required to run the" - + " pipeline. Automatically increasing it to {}", - numExecutors, - minExecutors); + LOG.warn("Number of executors {} is less than the minimum number required to run the pipeline. " + + "Automatically increasing it to {}", numExecutors, minExecutors); numExecutors = minExecutors; } sparkConf.set(property.getKey(), String.valueOf(numExecutors)); @@ -199,10 +188,9 @@ public void initialize() throws Exception { public void destroy() { super.destroy(); ProgramStatus status = getContext().getState().getStatus(); - WRAPPERLOGGER.info( - "Pipeline '{}' {}", - getContext().getApplicationSpecification().getName(), - status == ProgramStatus.COMPLETED ? "succeeded" : status.name().toLowerCase()); + WRAPPERLOGGER.info("Pipeline '{}' {}", getContext().getApplicationSpecification().getName(), + status == ProgramStatus.COMPLETED ? "succeeded" : status.name().toLowerCase()); + } private void emitMetrics(DataStreamsPipelineSpec spec) { diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/ETLSpark.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/ETLSpark.java index 07c28e8787bd..13e5da49ec8a 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/ETLSpark.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/ETLSpark.java @@ -102,14 +102,9 @@ public void initialize() throws Exception { // turn off auto-broadcast by default until we better understand the implications and can set this to a // value that we are confident is safe. sparkConf.set("spark.sql.autoBroadcastJoinThreshold", "-1"); - String maxFetchSize = String.valueOf(Integer.MAX_VALUE - 512); - sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", maxFetchSize); - // In Spark v3.2.0 and later, max netty direct memory must be - // >= spark.network.maxRemoteBlockSizeFetchToMem for executors. - // So we set max netty direct memory = spark.network.maxRemoteBlockSizeFetchToMem - // See CDAP-20758 for details. - String nettyMaxDirectMemory = String.format("-Dio.netty.maxDirectMemory=%s", maxFetchSize); - sparkConf.set("spark.executor.extraJavaOptions", nettyMaxDirectMemory); + // NOTE: If you change this value, also update io.netty.maxDirectMemory in + // KubeMasterEnvironment + sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", String.valueOf(Integer.MAX_VALUE - 512)); sparkConf.set("spark.network.timeout", "600s"); // Disable yarn app retries since spark already performs retries at a task level. sparkConf.set("spark.yarn.maxAppAttempts", "1"); diff --git a/cdap-kubernetes/src/main/java/io/cdap/cdap/master/environment/k8s/KubeMasterEnvironment.java b/cdap-kubernetes/src/main/java/io/cdap/cdap/master/environment/k8s/KubeMasterEnvironment.java index 7d2568189000..270fc19274c9 100644 --- a/cdap-kubernetes/src/main/java/io/cdap/cdap/master/environment/k8s/KubeMasterEnvironment.java +++ b/cdap-kubernetes/src/main/java/io/cdap/cdap/master/environment/k8s/KubeMasterEnvironment.java @@ -214,6 +214,7 @@ public class KubeMasterEnvironment implements MasterEnvironment { private static final String SPARK_DRIVER_LABEL_VALUE = "driver"; private static final String CDAP_CONTAINER_LABEL = "cdap.container"; private static final String TWILL_RUNNER_SERVICE_MONITOR_DISABLE = "twill.runner.service.monitor.disable"; + private static final String SPARK_EXECUTOR_JAVA_OPTS = "spark.executor.extraJavaOptions"; private static final String CONNECT_TIMEOUT = "master.environment.k8s.connect.timeout.sec"; static final String CONNECT_TIMEOUT_DEFAULT = "120"; @@ -496,6 +497,16 @@ public SparkConfig generateSparkSubmitConfig(SparkSubmitContext sparkSubmitConte sparkConfMap.put(SPARK_EXECUTOR_POD_CPU_REQUEST, String.format("%dm", executorCpuRequested)); sparkConfMap.put(SPARK_EXECUTOR_POD_CPU_LIMIT, String.format("%dm", executorCpuLimit)); + // In Spark v3.2.0 and later, max netty direct memory must be + // >= spark.network.maxRemoteBlockSizeFetchToMem for executors. + // So we set max netty direct memory = spark.network.maxRemoteBlockSizeFetchToMem + // See CDAP-20758 for details. + // NOTE: If spark.network.maxRemoteBlockSizeFetchToMem is changed in ETLSpark.java, + // DataStreamsSparkLaunch.java, then io.netty.maxDirectMemory will need also need to changed + // here. + String nettyMaxDirectMemory = String.format("-Dio.netty.maxDirectMemory=%d", Integer.MAX_VALUE - 512); + sparkConfMap.put(SPARK_EXECUTOR_JAVA_OPTS, nettyMaxDirectMemory); + // Add spark pod labels. This will be same as job labels populateLabels(sparkConfMap);