Skip to content

Commit

Permalink
Merge pull request #15280 from cdapio/CDAP20758-cp
Browse files Browse the repository at this point in the history
[CDAP-20758] Set max netty direct memory
  • Loading branch information
rmstar authored Aug 8, 2023
2 parents c8b17bb + 9018bcd commit d6c3634
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ protected void configure() {
@Override
public void initialize() throws Exception {
SparkClientContext context = getContext();
String arguments = Joiner.on(", ").withKeyValueSeparator("=").join(context.getRuntimeArguments());
String arguments =
Joiner.on(", ").withKeyValueSeparator("=").join(context.getRuntimeArguments());
WRAPPERLOGGER.info("Pipeline '{}' is started by user '{}' with arguments {}",
context.getApplicationSpecification().getName(),
UserGroupInformation.getCurrentUser().getShortUserName(),
Expand Down Expand Up @@ -126,16 +127,25 @@ public void initialize() throws Exception {
}
}
sparkConf.set("spark.streaming.backpressure.enabled", "true");
sparkConf.set("spark.spark.streaming.blockInterval", String.valueOf(spec.getBatchIntervalMillis() / 5));
sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", String.valueOf(Integer.MAX_VALUE - 512));
sparkConf.set(
"spark.spark.streaming.blockInterval", String.valueOf(spec.getBatchIntervalMillis() / 5));
String maxFetchSize = String.valueOf(Integer.MAX_VALUE - 512);
sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", maxFetchSize);

// spark... makes you set this to at least the number of receivers (streaming sources)
// because it holds one thread per receiver, or one core in distributed mode.
// so... we have to set this hacky master variable based on the isUnitTest setting in the config
String extraOpts = spec.getExtraJavaOpts();
// In Spark v3.2.0 and later, max netty direct memory must be
// >= spark.network.maxRemoteBlockSizeFetchToMem for executors.
// So we set max netty direct memory = spark.network.maxRemoteBlockSizeFetchToMem
// See CDAP-20758 for details.
String nettyMaxDirectMemory = String.format("-Dio.netty.maxDirectMemory=%s", maxFetchSize);
if (extraOpts != null && !extraOpts.isEmpty()) {
sparkConf.set("spark.driver.extraJavaOptions", extraOpts);
sparkConf.set("spark.executor.extraJavaOptions", extraOpts);
sparkConf.set("spark.executor.extraJavaOptions", nettyMaxDirectMemory + " " + extraOpts);
} else {
sparkConf.set("spark.executor.extraJavaOptions", nettyMaxDirectMemory);
}
// without this, stopping will hang on machines with few cores.
sparkConf.set("spark.rpc.netty.dispatcher.numThreads", String.valueOf(numSources + 2));
Expand All @@ -155,8 +165,11 @@ public void initialize() throws Exception {
try {
int numExecutors = Integer.parseInt(property.getValue());
if (numExecutors < minExecutors) {
LOG.warn("Number of executors {} is less than the minimum number required to run the pipeline. "
+ "Automatically increasing it to {}", numExecutors, minExecutors);
LOG.warn(
"Number of executors {} is less than the minimum number required to run the"
+ " pipeline. Automatically increasing it to {}",
numExecutors,
minExecutors);
numExecutors = minExecutors;
}
sparkConf.set(property.getKey(), String.valueOf(numExecutors));
Expand Down Expand Up @@ -186,9 +199,10 @@ public void initialize() throws Exception {
public void destroy() {
super.destroy();
ProgramStatus status = getContext().getState().getStatus();
WRAPPERLOGGER.info("Pipeline '{}' {}", getContext().getApplicationSpecification().getName(),
status == ProgramStatus.COMPLETED ? "succeeded" : status.name().toLowerCase());

WRAPPERLOGGER.info(
"Pipeline '{}' {}",
getContext().getApplicationSpecification().getName(),
status == ProgramStatus.COMPLETED ? "succeeded" : status.name().toLowerCase());
}

private void emitMetrics(DataStreamsPipelineSpec spec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,14 @@ public void initialize() throws Exception {
// turn off auto-broadcast by default until we better understand the implications and can set this to a
// value that we are confident is safe.
sparkConf.set("spark.sql.autoBroadcastJoinThreshold", "-1");
sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", String.valueOf(Integer.MAX_VALUE - 512));
String maxFetchSize = String.valueOf(Integer.MAX_VALUE - 512);
sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", maxFetchSize);
// In Spark v3.2.0 and later, max netty direct memory must be
// >= spark.network.maxRemoteBlockSizeFetchToMem for executors.
// So we set max netty direct memory = spark.network.maxRemoteBlockSizeFetchToMem
// See CDAP-20758 for details.
String nettyMaxDirectMemory = String.format("-Dio.netty.maxDirectMemory=%s", maxFetchSize);
sparkConf.set("spark.executor.extraJavaOptions", nettyMaxDirectMemory);
sparkConf.set("spark.network.timeout", "600s");
// Disable yarn app retries since spark already performs retries at a task level.
sparkConf.set("spark.yarn.maxAppAttempts", "1");
Expand Down

0 comments on commit d6c3634

Please sign in to comment.