Skip to content

Commit

Permalink
[fix](Checkpoint)fix bug of master push image while it's http service…
Browse files Browse the repository at this point in the history
… not serving(#22714)
  • Loading branch information
kobe6th committed Aug 9, 2023
1 parent 2bcf6c2 commit 240b450
Showing 1 changed file with 58 additions and 14 deletions.
72 changes: 58 additions & 14 deletions fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.persist.MetaCleaner;
import org.apache.doris.persist.Storage;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Frontend;

import com.google.common.base.Strings;
Expand All @@ -51,6 +52,7 @@ public class Checkpoint extends MasterDaemon {
private static final int PUT_TIMEOUT_SECOND = 3600;
private static final int CONNECT_TIMEOUT_SECOND = 1;
private static final int READ_TIMEOUT_SECOND = 1;
private static final int RETRY_TIME = 5;

private Env env;
private String imageDir;
Expand Down Expand Up @@ -183,26 +185,45 @@ public synchronized void doCheckpoint() throws CheckpointException {
int otherNodesCount = 0;
if (!allFrontends.isEmpty()) {
otherNodesCount = allFrontends.size() - 1; // skip master itself
for (Frontend fe : allFrontends) {
String host = fe.getHost();
if (host.equals(Env.getServingEnv().getMasterIp())) {
// skip master itself
continue;
int port = Config.http_port;
// make sure master FE http service is serving before pushing the image.
boolean isHttpServing = false;
for (int i = 0; i < RETRY_TIME && !isHttpServing; i++) {
isHttpServing = checkHttpServing(FrontendOptions.getLocalHostAddress(), port);
if (isHttpServing) {
isHttpServing = true;
break;
}
int port = Config.http_port;

String url = "http://" + host + ":" + port + "/put?version=" + replayedJournalId
+ "&port=" + port;
LOG.info("Put image:{}", url);

try {
MetaHelper.getRemoteFile(url, PUT_TIMEOUT_SECOND * 1000, new NullOutputStream());
successPushed++;
} catch (IOException e) {
LOG.error("Exception when pushing image file. url = {}", url, e);
Thread.sleep(3000);
} catch (InterruptedException e) {
LOG.warn("InterruptedException", e);
}
}

if (isHttpServing) {
for (Frontend fe : allFrontends) {
String host = fe.getHost();
if (host.equals(Env.getServingEnv().getMasterIp())) {
// skip master itself
continue;
}

String url = "http://" + host + ":" + port + "/put?version=" + replayedJournalId
+ "&port=" + port;
LOG.info("Put image:{}", url);

try {
MetaHelper.getRemoteFile(url, PUT_TIMEOUT_SECOND * 1000, new NullOutputStream());
successPushed++;
} catch (IOException e) {
LOG.error("Exception when pushing image file. url = {}", url, e);
}
}
} else {
LOG.warn("push image error cause master FE http service is not serving");
}
LOG.info("push image.{} to other nodes. totally {} nodes, push succeed {} nodes",
replayedJournalId, otherNodesCount, successPushed);
}
Expand Down Expand Up @@ -347,4 +368,27 @@ private long getMemoryUsedPercent() {
return used * 100 / max;
}
}

/*
* Check if FE http service is serving
* Return true if http code is 200
*/
private boolean checkHttpServing(String host, int port) {
URL idURL;
HttpURLConnection conn = null;
try {
idURL = new URL("http://" + host + ":" + port);
conn = (HttpURLConnection) idURL.openConnection();
conn.setConnectTimeout(CONNECT_TIMEOUT_SECOND * 1000);
conn.setReadTimeout(READ_TIMEOUT_SECOND * 1000);
return conn.getResponseCode() == 200;
} catch (Throwable e) {
LOG.warn("Exception when query FE http service, host: {}, port: {}, e: {}", host, port, e);
} finally {
if (conn != null) {
conn.disconnect();
}
}
return false;
}
}

0 comments on commit 240b450

Please sign in to comment.