Skip to content

Commit

Permalink
HDFS-17242. Make congestion backoff time configurable. (#6227)
Browse files Browse the repository at this point in the history
Reviewed-by: Xing Lin <[email protected]>
Reviewed-by: Ayush Saxena <[email protected]>
Signed-off-by: Tao Li <[email protected]>
  • Loading branch information
hfutatzhanghb authored Dec 13, 2023
1 parent 19b9e6a commit 562c42c
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
Expand Down Expand Up @@ -528,9 +529,8 @@ boolean doWaitForRestart() {
// are congested
private final List<DatanodeInfo> congestedNodes = new ArrayList<>();
private final Map<DatanodeInfo, Integer> slowNodeMap = new HashMap<>();
private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000;
private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS =
CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
private int congestionBackOffMeanTimeInMs;
private int congestionBackOffMaxTimeInMs;
private int lastCongestionBackoffTime;
private int maxPipelineRecoveryRetries;
private int markSlowNodeAsBadNodeThreshold;
Expand Down Expand Up @@ -564,6 +564,35 @@ private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
this.addBlockFlags = flags;
this.maxPipelineRecoveryRetries = conf.getMaxPipelineRecoveryRetries();
this.markSlowNodeAsBadNodeThreshold = conf.getMarkSlowNodeAsBadNodeThreshold();
congestionBackOffMeanTimeInMs = dfsClient.getConfiguration().getInt(
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME,
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT);
congestionBackOffMaxTimeInMs = dfsClient.getConfiguration().getInt(
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME,
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT);
if (congestionBackOffMeanTimeInMs <= 0) {
LOG.warn("Configuration: {} is not appropriate, using default value: {}",
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME,
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT);
}
if (congestionBackOffMaxTimeInMs <= 0) {
LOG.warn("Configuration: {} is not appropriate, using default value: {}",
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME,
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT);
}
if (congestionBackOffMaxTimeInMs < congestionBackOffMeanTimeInMs) {
LOG.warn("Configuration: {} can not less than {}, using their default values.",
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME,
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME);
}
if (congestionBackOffMeanTimeInMs <= 0 || congestionBackOffMaxTimeInMs <= 0 ||
congestionBackOffMaxTimeInMs < congestionBackOffMeanTimeInMs) {
congestionBackOffMeanTimeInMs =
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT;
congestionBackOffMaxTimeInMs =
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT;
}

}

/**
Expand Down Expand Up @@ -1998,10 +2027,10 @@ private void backOffIfNecessary() throws InterruptedException {
sb.append(' ').append(i);
}
int range = Math.abs(lastCongestionBackoffTime * 3 -
CONGESTION_BACKOFF_MEAN_TIME_IN_MS);
congestionBackOffMeanTimeInMs);
int base = Math.min(lastCongestionBackoffTime * 3,
CONGESTION_BACKOFF_MEAN_TIME_IN_MS);
t = Math.min(CONGESTION_BACK_OFF_MAX_TIME_IN_MS,
congestionBackOffMeanTimeInMs);
t = Math.min(congestionBackOffMaxTimeInMs,
(int)(base + Math.random() * range));
lastCongestionBackoffTime = t;
sb.append(" are congested. Backing off for ").append(t).append(" ms");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,15 @@ public interface HdfsClientConfigKeys {
"dfs.client.output.stream.uniq.default.key";
String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT";

String DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME =
"dfs.client.congestion.backoff.mean.time";
int DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT = 5000;

String DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME =
"dfs.client.congestion.backoff.max.time";
int DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT =
DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT * 10;

/**
* These are deprecated config keys to client code.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6559,6 +6559,22 @@
If the namespace is DEFAULT, it's best to change this conf to other value.
</description>
</property>
<property>
<name>dfs.client.congestion.backoff.mean.time</name>
<value>5000</value>
<description>
The mean time in milliseconds which is used to compute
client congestion backoff sleep time.
</description>
</property>
<property>
<name>dfs.client.congestion.backoff.max.time</name>
<value>50000</value>
<description>
The max time in milliseconds which is used to restrict
the upper limit backoff sleep time for client.
</description>
</property>
<property>
<name>dfs.client.rbf.observer.read.enable</name>
<value>false</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ private void runAdjustChunkBoundary(
public void testCongestionBackoff() throws IOException {
DfsClientConf dfsClientConf = mock(DfsClientConf.class);
DFSClient client = mock(DFSClient.class);
Configuration conf = mock(Configuration.class);
when(client.getConfiguration()).thenReturn(conf);
when(client.getConf()).thenReturn(dfsClientConf);
when(client.getTracer()).thenReturn(FsTracer.get(new Configuration()));
client.clientRunning = true;
Expand Down Expand Up @@ -306,6 +308,8 @@ public void testCongestionBackoff() throws IOException {
public void testCongestionAckDelay() {
DfsClientConf dfsClientConf = mock(DfsClientConf.class);
DFSClient client = mock(DFSClient.class);
Configuration conf = mock(Configuration.class);
when(client.getConfiguration()).thenReturn(conf);
when(client.getConf()).thenReturn(dfsClientConf);
when(client.getTracer()).thenReturn(FsTracer.get(new Configuration()));
client.clientRunning = true;
Expand All @@ -325,7 +329,7 @@ public void testCongestionAckDelay() {
ArrayList<DatanodeInfo> congestedNodes = (ArrayList<DatanodeInfo>)
Whitebox.getInternalState(stream, "congestedNodes");
int backOffMaxTime = (int)
Whitebox.getInternalState(stream, "CONGESTION_BACK_OFF_MAX_TIME_IN_MS");
Whitebox.getInternalState(stream, "congestionBackOffMaxTimeInMs");
DFSPacket[] packet = new DFSPacket[100];
AtomicBoolean isDelay = new AtomicBoolean(true);

Expand Down

0 comments on commit 562c42c

Please sign in to comment.