Skip to content

Commit

Permalink
[opt](fe) exit FE when transfer to (non)master failed (#34809)
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed May 21, 2024
1 parent 037de3d commit 8de786f
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 132 deletions.
3 changes: 1 addition & 2 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -953,8 +953,7 @@ Status VFileScanner::_get_next_reader() {
COUNTER_UPDATE(_empty_file_counter, 1);
continue;
} else if (!init_status.ok()) {
return Status::InternalError("failed to init reader for file {}, err: {}", range.path,
init_status.to_string());
return Status::InternalError("failed to init reader, err: {}", init_status.to_string());
}

_name_to_col_type.clear();
Expand Down
275 changes: 146 additions & 129 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -1440,137 +1440,147 @@ private void getHelperNodeFromDeployManager() throws Exception {

@SuppressWarnings({"checkstyle:WhitespaceAfter", "checkstyle:LineLength"})
private void transferToMaster() {
// stop replayer
if (replayer != null) {
replayer.exit();
try {
replayer.join();
} catch (InterruptedException e) {
LOG.warn("got exception when stopping the replayer thread", e);
try {
// stop replayer
if (replayer != null) {
replayer.exit();
try {
replayer.join();
} catch (InterruptedException e) {
LOG.warn("got exception when stopping the replayer thread", e);
}
replayer = null;
}
replayer = null;
}

// set this after replay thread stopped. to avoid replay thread modify them.
isReady.set(false);
canRead.set(false);
// set this after replay thread stopped. to avoid replay thread modify them.
isReady.set(false);
canRead.set(false);

toMasterProgress = "open editlog";
editLog.open();
toMasterProgress = "open editlog";
editLog.open();

if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
if (!haProtocol.fencing()) {
LOG.error("fencing failed. will exit.");
System.exit(-1);
if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
if (!haProtocol.fencing()) {
LOG.error("fencing failed. will exit.");
System.exit(-1);
}
}
}

toMasterProgress = "replay journal";
long replayStartTime = System.currentTimeMillis();
// replay journals. -1 means replay all the journals larger than current journal id.
replayJournal(-1);
long replayEndTime = System.currentTimeMillis();
LOG.info("finish replay in " + (replayEndTime - replayStartTime) + " msec");
toMasterProgress = "replay journal";
long replayStartTime = System.currentTimeMillis();
// replay journals. -1 means replay all the journals larger than current journal id.
replayJournal(-1);
long replayEndTime = System.currentTimeMillis();
LOG.info("finish replay in " + (replayEndTime - replayStartTime) + " msec");

checkCurrentNodeExist();
checkCurrentNodeExist();

checkBeExecVersion();
checkBeExecVersion();

toMasterProgress = "roll editlog";
editLog.rollEditLog();
toMasterProgress = "roll editlog";
editLog.rollEditLog();

// Log meta_version
long journalVersion = MetaContext.get().getMetaVersion();
if (journalVersion < FeConstants.meta_version) {
toMasterProgress = "log meta version";
editLog.logMetaVersion(FeConstants.meta_version);
MetaContext.get().setMetaVersion(FeConstants.meta_version);
}
// Log meta_version
long journalVersion = MetaContext.get().getMetaVersion();
if (journalVersion < FeConstants.meta_version) {
toMasterProgress = "log meta version";
editLog.logMetaVersion(FeConstants.meta_version);
MetaContext.get().setMetaVersion(FeConstants.meta_version);
}

// Log the first frontend
if (isFirstTimeStartUp) {
// if isFirstTimeStartUp is true, frontends must contains this Node.
Frontend self = frontends.get(nodeName);
Preconditions.checkNotNull(self);
// OP_ADD_FIRST_FRONTEND is emitted, so it can write to BDBJE even if canWrite is false
editLog.logAddFirstFrontend(self);
// Log the first frontend
if (isFirstTimeStartUp) {
// if isFirstTimeStartUp is true, frontends must contains this Node.
Frontend self = frontends.get(nodeName);
Preconditions.checkNotNull(self);
// OP_ADD_FIRST_FRONTEND is emitted, so it can write to BDBJE even if canWrite is false
editLog.logAddFirstFrontend(self);

initLowerCaseTableNames();
// Set initial root password if master FE first time launch.
auth.setInitialRootPassword(Config.initial_root_password);
} else {
if (journalVersion <= FeMetaVersion.VERSION_114) {
// if journal version is less than 114, which means it is upgraded from version before 2.0.
// When upgrading from 1.2 to 2.0, we need to make sure that the parallelism of query remain unchanged
// when switch to pipeline engine, otherwise it may impact the load of entire cluster
// because the default parallelism of pipeline engine is higher than previous version.
// so set parallel_pipeline_task_num to parallel_fragment_exec_instance_num
int newVal = VariableMgr.newSessionVariable().parallelExecInstanceNum;
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.PARALLEL_PIPELINE_TASK_NUM,
String.valueOf(newVal));

// similar reason as above, need to upgrade broadcast scale factor during 1.2 to 2.x
// if the default value has been upgraded
double newBcFactorVal = VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor();
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR,
String.valueOf(newBcFactorVal));

// similar reason as above, need to upgrade enable_nereids_planner to true
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.ENABLE_NEREIDS_PLANNER,
"true");
}
if (journalVersion <= FeMetaVersion.VERSION_123) {
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", SessionVariable.ENABLE_NEREIDS_DML, "true");
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none");
if (VariableMgr.newSessionVariable().nereidsTimeoutSecond == 5) {
initLowerCaseTableNames();
// Set initial root password if master FE first time launch.
auth.setInitialRootPassword(Config.initial_root_password);
} else {
if (journalVersion <= FeMetaVersion.VERSION_114) {
// if journal version is less than 114, which means it is upgraded from version before 2.0.
// When upgrading from 1.2 to 2.0,
// we need to make sure that the parallelism of query remain unchanged
// when switch to pipeline engine, otherwise it may impact the load of entire cluster
// because the default parallelism of pipeline engine is higher than previous version.
// so set parallel_pipeline_task_num to parallel_fragment_exec_instance_num
int newVal = VariableMgr.newSessionVariable().parallelExecInstanceNum;
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
SessionVariable.PARALLEL_PIPELINE_TASK_NUM,
String.valueOf(newVal));

// similar reason as above, need to upgrade broadcast scale factor during 1.2 to 2.x
// if the default value has been upgraded
double newBcFactorVal = VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor();
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR,
String.valueOf(newBcFactorVal));

// similar reason as above, need to upgrade enable_nereids_planner to true
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.ENABLE_NEREIDS_PLANNER,
"true");
}
if (journalVersion <= FeMetaVersion.VERSION_123) {
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", SessionVariable.ENABLE_NEREIDS_DML,
"true");
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.NEREIDS_TIMEOUT_SECOND, "30");
SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none");
if (VariableMgr.newSessionVariable().nereidsTimeoutSecond == 5) {
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.NEREIDS_TIMEOUT_SECOND, "30");
}
}
}
}

getPolicyMgr().createDefaultStoragePolicy();
getPolicyMgr().createDefaultStoragePolicy();

// MUST set master ip before starting checkpoint thread.
// because checkpoint thread need this info to select non-master FE to push image
// MUST set master ip before starting checkpoint thread.
// because checkpoint thread need this info to select non-master FE to push image

toMasterProgress = "log master info";
this.masterInfo = new MasterInfo(Env.getCurrentEnv().getSelfNode().getHost(),
Config.http_port,
Config.rpc_port);
editLog.logMasterInfo(masterInfo);
LOG.info("logMasterInfo:{}", masterInfo);
toMasterProgress = "log master info";
this.masterInfo = new MasterInfo(Env.getCurrentEnv().getSelfNode().getHost(),
Config.http_port,
Config.rpc_port);
editLog.logMasterInfo(masterInfo);
LOG.info("logMasterInfo:{}", masterInfo);

// for master, the 'isReady' is set behind.
// but we are sure that all metadata is replayed if we get here.
// so no need to check 'isReady' flag in this method
postProcessAfterMetadataReplayed(false);
// for master, the 'isReady' is set behind.
// but we are sure that all metadata is replayed if we get here.
// so no need to check 'isReady' flag in this method
postProcessAfterMetadataReplayed(false);

insertOverwriteManager.allTaskFail();
insertOverwriteManager.allTaskFail();

toMasterProgress = "start daemon threads";
toMasterProgress = "start daemon threads";

// start all daemon threads that only running on MASTER FE
startMasterOnlyDaemonThreads();
// start other daemon threads that should running on all FE
startNonMasterDaemonThreads();
// start all daemon threads that only running on MASTER FE
startMasterOnlyDaemonThreads();
// start other daemon threads that should running on all FE
startNonMasterDaemonThreads();

MetricRepo.init();
MetricRepo.init();

toMasterProgress = "finished";
canRead.set(true);
isReady.set(true);
checkLowerCaseTableNames();
toMasterProgress = "finished";
canRead.set(true);
isReady.set(true);
checkLowerCaseTableNames();

String msg = "master finished to replay journal, can write now.";
Util.stdoutWithTime(msg);
LOG.info(msg);
// for master, there are some new thread pools need to register metric
ThreadPoolManager.registerAllThreadPoolMetric();
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
String msg = "master finished to replay journal, can write now.";
Util.stdoutWithTime(msg);
LOG.info(msg);
// for master, there are some new thread pools need to register metric
ThreadPoolManager.registerAllThreadPoolMetric();
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
}
} catch (Throwable e) {
// When failed to transfer to master, we need to exit the process.
// Otherwise, the process will be in an unknown state.
LOG.error("failed to transfer to master. progress: {}", toMasterProgress, e);
System.exit(-1);
}
}

Expand Down Expand Up @@ -1718,36 +1728,43 @@ private void startNonMasterDaemonThreads() {
private void transferToNonMaster(FrontendNodeType newType) {
isReady.set(false);

if (feType == FrontendNodeType.OBSERVER || feType == FrontendNodeType.FOLLOWER) {
Preconditions.checkState(newType == FrontendNodeType.UNKNOWN);
LOG.warn("{} to UNKNOWN, still offer read service", feType.name());
// not set canRead here, leave canRead as what is was.
// if meta out of date, canRead will be set to false in replayer thread.
metaReplayState.setTransferToUnknown();
return;
}
try {
if (feType == FrontendNodeType.OBSERVER || feType == FrontendNodeType.FOLLOWER) {
Preconditions.checkState(newType == FrontendNodeType.UNKNOWN);
LOG.warn("{} to UNKNOWN, still offer read service", feType.name());
// not set canRead here, leave canRead as what is was.
// if meta out of date, canRead will be set to false in replayer thread.
metaReplayState.setTransferToUnknown();
return;
}

// transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER
// transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER

if (replayer == null) {
createReplayer();
replayer.start();
}
if (replayer == null) {
createReplayer();
replayer.start();
}

// 'isReady' will be set to true in 'setCanRead()' method
if (!postProcessAfterMetadataReplayed(true)) {
// the state has changed, exit early.
return;
}
// 'isReady' will be set to true in 'setCanRead()' method
if (!postProcessAfterMetadataReplayed(true)) {
// the state has changed, exit early.
return;
}

checkLowerCaseTableNames();
checkLowerCaseTableNames();

startNonMasterDaemonThreads();
startNonMasterDaemonThreads();

MetricRepo.init();
MetricRepo.init();

if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
}
} catch (Throwable e) {
// When failed to transfer to non-master, we need to exit the process.
// Otherwise, the process will be in an unknown state.
LOG.error("failed to transfer to non-master.", e);
System.exit(-1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public FeServer(int port) {

public void start() throws IOException {
FrontendServiceImpl service = new FrontendServiceImpl(ExecuteEnv.getInstance());
Logger feServiceLogger = LogManager.getLogger(FrontendServiceImpl.class);
Logger feServiceLogger = LogManager.getLogger(FeServer.class);
FrontendService.Iface instance = (FrontendService.Iface) Proxy.newProxyInstance(
FrontendServiceImpl.class.getClassLoader(),
FrontendServiceImpl.class.getInterfaces(),
Expand Down

0 comments on commit 8de786f

Please sign in to comment.