Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDAP-20758] Set max netty direct memory #15274

Merged
merged 1 commit into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ protected void configure() {
@Override
public void initialize() throws Exception {
SparkClientContext context = getContext();
String arguments = Joiner.on(", ").withKeyValueSeparator("=").join(context.getRuntimeArguments());
String arguments =
Joiner.on(", ").withKeyValueSeparator("=").join(context.getRuntimeArguments());
WRAPPERLOGGER.info("Pipeline '{}' is started by user '{}' with arguments {}",
context.getApplicationSpecification().getName(),
UserGroupInformation.getCurrentUser().getShortUserName(),
Expand Down Expand Up @@ -126,16 +127,25 @@ public void initialize() throws Exception {
}
}
sparkConf.set("spark.streaming.backpressure.enabled", "true");
sparkConf.set("spark.spark.streaming.blockInterval", String.valueOf(spec.getBatchIntervalMillis() / 5));
sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", String.valueOf(Integer.MAX_VALUE - 512));
sparkConf.set(
"spark.spark.streaming.blockInterval", String.valueOf(spec.getBatchIntervalMillis() / 5));
String maxFetchSize = String.valueOf(Integer.MAX_VALUE - 512);
sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", maxFetchSize);

// spark... makes you set this to at least the number of receivers (streaming sources)
// because it holds one thread per receiver, or one core in distributed mode.
// so... we have to set this hacky master variable based on the isUnitTest setting in the config
String extraOpts = spec.getExtraJavaOpts();
// In Spark v3.2.0 and later, max netty direct memory must be
// >= spark.network.maxRemoteBlockSizeFetchToMem for executors.
// So we set max netty direct memory = spark.network.maxRemoteBlockSizeFetchToMem
// See CDAP-20758 for details.
String nettyMaxDirectMemory = String.format("-Dio.netty.maxDirectMemory=%s", maxFetchSize);
if (extraOpts != null && !extraOpts.isEmpty()) {
sparkConf.set("spark.driver.extraJavaOptions", extraOpts);
sparkConf.set("spark.executor.extraJavaOptions", extraOpts);
sparkConf.set("spark.executor.extraJavaOptions", nettyMaxDirectMemory + " " + extraOpts);
} else {
sparkConf.set("spark.executor.extraJavaOptions", nettyMaxDirectMemory);
}
// without this, stopping will hang on machines with few cores.
sparkConf.set("spark.rpc.netty.dispatcher.numThreads", String.valueOf(numSources + 2));
Expand All @@ -155,8 +165,11 @@ public void initialize() throws Exception {
try {
int numExecutors = Integer.parseInt(property.getValue());
if (numExecutors < minExecutors) {
LOG.warn("Number of executors {} is less than the minimum number required to run the pipeline. "
+ "Automatically increasing it to {}", numExecutors, minExecutors);
LOG.warn(
"Number of executors {} is less than the minimum number required to run the"
+ " pipeline. Automatically increasing it to {}",
numExecutors,
minExecutors);
numExecutors = minExecutors;
}
sparkConf.set(property.getKey(), String.valueOf(numExecutors));
Expand Down Expand Up @@ -186,9 +199,10 @@ public void initialize() throws Exception {
public void destroy() {
super.destroy();
ProgramStatus status = getContext().getState().getStatus();
WRAPPERLOGGER.info("Pipeline '{}' {}", getContext().getApplicationSpecification().getName(),
status == ProgramStatus.COMPLETED ? "succeeded" : status.name().toLowerCase());

WRAPPERLOGGER.info(
"Pipeline '{}' {}",
getContext().getApplicationSpecification().getName(),
status == ProgramStatus.COMPLETED ? "succeeded" : status.name().toLowerCase());
}

private void emitMetrics(DataStreamsPipelineSpec spec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,14 @@ public void initialize() throws Exception {
// turn off auto-broadcast by default until we better understand the implications and can set this to a
// value that we are confident is safe.
sparkConf.set("spark.sql.autoBroadcastJoinThreshold", "-1");
sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", String.valueOf(Integer.MAX_VALUE - 512));
String maxFetchSize = String.valueOf(Integer.MAX_VALUE - 512);
sparkConf.set("spark.network.maxRemoteBlockSizeFetchToMem", maxFetchSize);
// In Spark v3.2.0 and later, max netty direct memory must be
// >= spark.network.maxRemoteBlockSizeFetchToMem for executors.
// So we set max netty direct memory = spark.network.maxRemoteBlockSizeFetchToMem
// See CDAP-20758 for details.
String nettyMaxDirectMemory = String.format("-Dio.netty.maxDirectMemory=%s", maxFetchSize);
sparkConf.set("spark.executor.extraJavaOptions", nettyMaxDirectMemory);
sparkConf.set("spark.network.timeout", "600s");
// Disable yarn app retries since spark already performs retries at a task level.
sparkConf.set("spark.yarn.maxAppAttempts", "1");
Expand Down