Skip to content

Commit

Permalink
Merge pull request #15288 from cdapio/fix_netty_mem_develop
Browse files Browse the repository at this point in the history
Fix netty mem develop
  • Loading branch information
rmstar authored Aug 12, 2023
2 parents 1cc0a61 + 1bd0fbe commit fd11316
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ protected void configure() {
@Override
public void initialize() throws Exception {
SparkClientContext context = getContext();
String arguments =
Joiner.on(", ").withKeyValueSeparator("=").join(context.getRuntimeArguments());
String arguments = Joiner.on(", ").withKeyValueSeparator("=").join(context.getRuntimeArguments());
WRAPPERLOGGER.info("Pipeline '{}' is started by user '{}' with arguments {}",
context.getApplicationSpecification().getName(),
UserGroupInformation.getCurrentUser().getShortUserName(),
Expand Down Expand Up @@ -127,25 +126,18 @@ public void initialize() throws Exception {
}
}
sparkConf.set("spark.streaming.backpressure.enabled", "true");
sparkConf.set(
"spark.spark.streaming.blockInterval", String.valueOf(spec.getBatchIntervalMillis() / 5));
String maxFetchSize = String.valueOf(Integer.MAX_VALUE - 512);
sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", maxFetchSize);
sparkConf.set("spark.spark.streaming.blockInterval", String.valueOf(spec.getBatchIntervalMillis() / 5));
// NOTE: If you change this value, also update io.netty.maxDirectMemory in
// KubeMasterEnvironment
sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", String.valueOf(Integer.MAX_VALUE - 512));

// spark... makes you set this to at least the number of receivers (streaming sources)
// because it holds one thread per receiver, or one core in distributed mode.
// so... we have to set this hacky master variable based on the isUnitTest setting in the config
String extraOpts = spec.getExtraJavaOpts();
// In Spark v3.2.0 and later, max netty direct memory must be
// >= spark.network.maxRemoteBlockSizeFetchToMem for executors.
// So we set max netty direct memory = spark.network.maxRemoteBlockSizeFetchToMem
// See CDAP-20758 for details.
String nettyMaxDirectMemory = String.format("-Dio.netty.maxDirectMemory=%s", maxFetchSize);
if (extraOpts != null && !extraOpts.isEmpty()) {
sparkConf.set("spark.driver.extraJavaOptions", extraOpts);
sparkConf.set("spark.executor.extraJavaOptions", nettyMaxDirectMemory + " " + extraOpts);
} else {
sparkConf.set("spark.executor.extraJavaOptions", nettyMaxDirectMemory);
sparkConf.set("spark.executor.extraJavaOptions", extraOpts);
}
// without this, stopping will hang on machines with few cores.
sparkConf.set("spark.rpc.netty.dispatcher.numThreads", String.valueOf(numSources + 2));
Expand All @@ -165,11 +157,8 @@ public void initialize() throws Exception {
try {
int numExecutors = Integer.parseInt(property.getValue());
if (numExecutors < minExecutors) {
LOG.warn(
"Number of executors {} is less than the minimum number required to run the"
+ " pipeline. Automatically increasing it to {}",
numExecutors,
minExecutors);
LOG.warn("Number of executors {} is less than the minimum number required to run the pipeline. "
+ "Automatically increasing it to {}", numExecutors, minExecutors);
numExecutors = minExecutors;
}
sparkConf.set(property.getKey(), String.valueOf(numExecutors));
Expand Down Expand Up @@ -199,10 +188,9 @@ public void initialize() throws Exception {
public void destroy() {
super.destroy();
ProgramStatus status = getContext().getState().getStatus();
WRAPPERLOGGER.info(
"Pipeline '{}' {}",
getContext().getApplicationSpecification().getName(),
status == ProgramStatus.COMPLETED ? "succeeded" : status.name().toLowerCase());
WRAPPERLOGGER.info("Pipeline '{}' {}", getContext().getApplicationSpecification().getName(),
status == ProgramStatus.COMPLETED ? "succeeded" : status.name().toLowerCase());

}

private void emitMetrics(DataStreamsPipelineSpec spec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,9 @@ public void initialize() throws Exception {
// turn off auto-broadcast by default until we better understand the implications and can set this to a
// value that we are confident is safe.
sparkConf.set("spark.sql.autoBroadcastJoinThreshold", "-1");
String maxFetchSize = String.valueOf(Integer.MAX_VALUE - 512);
sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", maxFetchSize);
// In Spark v3.2.0 and later, max netty direct memory must be
// >= spark.network.maxRemoteBlockSizeFetchToMem for executors.
// So we set max netty direct memory = spark.network.maxRemoteBlockSizeFetchToMem
// See CDAP-20758 for details.
String nettyMaxDirectMemory = String.format("-Dio.netty.maxDirectMemory=%s", maxFetchSize);
sparkConf.set("spark.executor.extraJavaOptions", nettyMaxDirectMemory);
// NOTE: If you change this value, also update io.netty.maxDirectMemory in
// KubeMasterEnvironment
sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", String.valueOf(Integer.MAX_VALUE - 512));
sparkConf.set("spark.network.timeout", "600s");
// Disable yarn app retries since spark already performs retries at a task level.
sparkConf.set("spark.yarn.maxAppAttempts", "1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public class KubeMasterEnvironment implements MasterEnvironment {
private static final String SPARK_DRIVER_LABEL_VALUE = "driver";
private static final String CDAP_CONTAINER_LABEL = "cdap.container";
private static final String TWILL_RUNNER_SERVICE_MONITOR_DISABLE = "twill.runner.service.monitor.disable";
private static final String SPARK_EXECUTOR_JAVA_OPTS = "spark.executor.extraJavaOptions";

private static final String CONNECT_TIMEOUT = "master.environment.k8s.connect.timeout.sec";
static final String CONNECT_TIMEOUT_DEFAULT = "120";
Expand Down Expand Up @@ -496,6 +497,16 @@ public SparkConfig generateSparkSubmitConfig(SparkSubmitContext sparkSubmitConte
sparkConfMap.put(SPARK_EXECUTOR_POD_CPU_REQUEST, String.format("%dm", executorCpuRequested));
sparkConfMap.put(SPARK_EXECUTOR_POD_CPU_LIMIT, String.format("%dm", executorCpuLimit));

// In Spark v3.2.0 and later, max netty direct memory must be
// >= spark.network.maxRemoteBlockSizeFetchToMem for executors.
// So we set max netty direct memory = spark.network.maxRemoteBlockSizeFetchToMem
// See CDAP-20758 for details.
// NOTE: If spark.network.maxRemoteBlockSizeFetchToMem is changed in ETLSpark.java,
// DataStreamsSparkLaunch.java, then io.netty.maxDirectMemory will need also need to changed
// here.
String nettyMaxDirectMemory = String.format("-Dio.netty.maxDirectMemory=%d", Integer.MAX_VALUE - 512);
sparkConfMap.put(SPARK_EXECUTOR_JAVA_OPTS, nettyMaxDirectMemory);

// Add spark pod labels. This will be same as job labels
populateLabels(sparkConfMap);

Expand Down

0 comments on commit fd11316

Please sign in to comment.