diff --git a/cdap-app-templates/cdap-etl/cdap-data-streams-base/src/main/java/io/cdap/cdap/datastreams/DataStreamsSparkLauncher.java b/cdap-app-templates/cdap-etl/cdap-data-streams-base/src/main/java/io/cdap/cdap/datastreams/DataStreamsSparkLauncher.java index f94dec78d217..865f9939c276 100644 --- a/cdap-app-templates/cdap-etl/cdap-data-streams-base/src/main/java/io/cdap/cdap/datastreams/DataStreamsSparkLauncher.java +++ b/cdap-app-templates/cdap-etl/cdap-data-streams-base/src/main/java/io/cdap/cdap/datastreams/DataStreamsSparkLauncher.java @@ -79,7 +79,8 @@ protected void configure() { @Override public void initialize() throws Exception { SparkClientContext context = getContext(); - String arguments = Joiner.on(", ").withKeyValueSeparator("=").join(context.getRuntimeArguments()); + String arguments = + Joiner.on(", ").withKeyValueSeparator("=").join(context.getRuntimeArguments()); WRAPPERLOGGER.info("Pipeline '{}' is started by user '{}' with arguments {}", context.getApplicationSpecification().getName(), UserGroupInformation.getCurrentUser().getShortUserName(), @@ -126,16 +127,25 @@ public void initialize() throws Exception { } } sparkConf.set("spark.streaming.backpressure.enabled", "true"); - sparkConf.set("spark.spark.streaming.blockInterval", String.valueOf(spec.getBatchIntervalMillis() / 5)); - sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", String.valueOf(Integer.MAX_VALUE - 512)); + sparkConf.set( + "spark.spark.streaming.blockInterval", String.valueOf(spec.getBatchIntervalMillis() / 5)); + String maxFetchSize = String.valueOf(Integer.MAX_VALUE - 512); + sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", maxFetchSize); // spark... makes you set this to at least the number of receivers (streaming sources) // because it holds one thread per receiver, or one core in distributed mode. // so... we have to set this hacky master variable based on the isUnitTest setting in the config String extraOpts = spec.getExtraJavaOpts(); + // In Spark v3.2.0 and later, max netty direct memory must be + // >= spark.network.maxRemoteBlockSizeFetchToMem for executors. + // So we set max netty direct memory = spark.network.maxRemoteBlockSizeFetchToMem + // See CDAP-20758 for details. + String nettyMaxDirectMemory = String.format("-Dio.netty.maxDirectMemory=%s", maxFetchSize); if (extraOpts != null && !extraOpts.isEmpty()) { sparkConf.set("spark.driver.extraJavaOptions", extraOpts); - sparkConf.set("spark.executor.extraJavaOptions", extraOpts); + sparkConf.set("spark.executor.extraJavaOptions", nettyMaxDirectMemory + " " + extraOpts); + } else { + sparkConf.set("spark.executor.extraJavaOptions", nettyMaxDirectMemory); } // without this, stopping will hang on machines with few cores. sparkConf.set("spark.rpc.netty.dispatcher.numThreads", String.valueOf(numSources + 2)); @@ -155,8 +165,11 @@ public void initialize() throws Exception { try { int numExecutors = Integer.parseInt(property.getValue()); if (numExecutors < minExecutors) { - LOG.warn("Number of executors {} is less than the minimum number required to run the pipeline. " - + "Automatically increasing it to {}", numExecutors, minExecutors); + LOG.warn( + "Number of executors {} is less than the minimum number required to run the" + + " pipeline. Automatically increasing it to {}", + numExecutors, + minExecutors); numExecutors = minExecutors; } sparkConf.set(property.getKey(), String.valueOf(numExecutors)); @@ -186,9 +199,10 @@ public void initialize() throws Exception { public void destroy() { super.destroy(); ProgramStatus status = getContext().getState().getStatus(); - WRAPPERLOGGER.info("Pipeline '{}' {}", getContext().getApplicationSpecification().getName(), - status == ProgramStatus.COMPLETED ? "succeeded" : status.name().toLowerCase()); - + WRAPPERLOGGER.info( + "Pipeline '{}' {}", + getContext().getApplicationSpecification().getName(), + status == ProgramStatus.COMPLETED ? "succeeded" : status.name().toLowerCase()); } private void emitMetrics(DataStreamsPipelineSpec spec) { diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/ETLSpark.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/ETLSpark.java index fb5caf9574ec..07c28e8787bd 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/ETLSpark.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/ETLSpark.java @@ -102,7 +102,14 @@ public void initialize() throws Exception { // turn off auto-broadcast by default until we better understand the implications and can set this to a // value that we are confident is safe. sparkConf.set("spark.sql.autoBroadcastJoinThreshold", "-1"); - sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", String.valueOf(Integer.MAX_VALUE - 512)); + String maxFetchSize = String.valueOf(Integer.MAX_VALUE - 512); + sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", maxFetchSize); + // In Spark v3.2.0 and later, max netty direct memory must be + // >= spark.network.maxRemoteBlockSizeFetchToMem for executors. + // So we set max netty direct memory = spark.network.maxRemoteBlockSizeFetchToMem + // See CDAP-20758 for details. + String nettyMaxDirectMemory = String.format("-Dio.netty.maxDirectMemory=%s", maxFetchSize); + sparkConf.set("spark.executor.extraJavaOptions", nettyMaxDirectMemory); sparkConf.set("spark.network.timeout", "600s"); // Disable yarn app retries since spark already performs retries at a task level. sparkConf.set("spark.yarn.maxAppAttempts", "1");