Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](fe) exit FE when transfer to (non)master failed (#34809) #35158

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -953,8 +953,7 @@ Status VFileScanner::_get_next_reader() {
COUNTER_UPDATE(_empty_file_counter, 1);
continue;
} else if (!init_status.ok()) {
return Status::InternalError("failed to init reader for file {}, err: {}", range.path,
init_status.to_string());
return Status::InternalError("failed to init reader, err: {}", init_status.to_string());
}

_name_to_col_type.clear();
Expand Down
275 changes: 146 additions & 129 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -1440,137 +1440,147 @@ private void getHelperNodeFromDeployManager() throws Exception {

@SuppressWarnings({"checkstyle:WhitespaceAfter", "checkstyle:LineLength"})
private void transferToMaster() {
// stop replayer
if (replayer != null) {
replayer.exit();
try {
replayer.join();
} catch (InterruptedException e) {
LOG.warn("got exception when stopping the replayer thread", e);
try {
// stop replayer
if (replayer != null) {
replayer.exit();
try {
replayer.join();
} catch (InterruptedException e) {
LOG.warn("got exception when stopping the replayer thread", e);
}
replayer = null;
}
replayer = null;
}

// set this after replay thread stopped. to avoid replay thread modify them.
isReady.set(false);
canRead.set(false);
// set this after replay thread stopped. to avoid replay thread modify them.
isReady.set(false);
canRead.set(false);

toMasterProgress = "open editlog";
editLog.open();
toMasterProgress = "open editlog";
editLog.open();

if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
if (!haProtocol.fencing()) {
LOG.error("fencing failed. will exit.");
System.exit(-1);
if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
if (!haProtocol.fencing()) {
LOG.error("fencing failed. will exit.");
System.exit(-1);
}
}
}

toMasterProgress = "replay journal";
long replayStartTime = System.currentTimeMillis();
// replay journals. -1 means replay all the journals larger than current journal id.
replayJournal(-1);
long replayEndTime = System.currentTimeMillis();
LOG.info("finish replay in " + (replayEndTime - replayStartTime) + " msec");
toMasterProgress = "replay journal";
long replayStartTime = System.currentTimeMillis();
// replay journals. -1 means replay all the journals larger than current journal id.
replayJournal(-1);
long replayEndTime = System.currentTimeMillis();
LOG.info("finish replay in " + (replayEndTime - replayStartTime) + " msec");

checkCurrentNodeExist();
checkCurrentNodeExist();

checkBeExecVersion();
checkBeExecVersion();

toMasterProgress = "roll editlog";
editLog.rollEditLog();
toMasterProgress = "roll editlog";
editLog.rollEditLog();

// Log meta_version
long journalVersion = MetaContext.get().getMetaVersion();
if (journalVersion < FeConstants.meta_version) {
toMasterProgress = "log meta version";
editLog.logMetaVersion(FeConstants.meta_version);
MetaContext.get().setMetaVersion(FeConstants.meta_version);
}
// Log meta_version
long journalVersion = MetaContext.get().getMetaVersion();
if (journalVersion < FeConstants.meta_version) {
toMasterProgress = "log meta version";
editLog.logMetaVersion(FeConstants.meta_version);
MetaContext.get().setMetaVersion(FeConstants.meta_version);
}

// Log the first frontend
if (isFirstTimeStartUp) {
// if isFirstTimeStartUp is true, frontends must contains this Node.
Frontend self = frontends.get(nodeName);
Preconditions.checkNotNull(self);
// OP_ADD_FIRST_FRONTEND is emitted, so it can write to BDBJE even if canWrite is false
editLog.logAddFirstFrontend(self);
// Log the first frontend
if (isFirstTimeStartUp) {
// if isFirstTimeStartUp is true, frontends must contains this Node.
Frontend self = frontends.get(nodeName);
Preconditions.checkNotNull(self);
// OP_ADD_FIRST_FRONTEND is emitted, so it can write to BDBJE even if canWrite is false
editLog.logAddFirstFrontend(self);

initLowerCaseTableNames();
// Set initial root password if master FE first time launch.
auth.setInitialRootPassword(Config.initial_root_password);
} else {
if (journalVersion <= FeMetaVersion.VERSION_114) {
// if journal version is less than 114, which means it is upgraded from version before 2.0.
// When upgrading from 1.2 to 2.0, we need to make sure that the parallelism of query remain unchanged
// when switch to pipeline engine, otherwise it may impact the load of entire cluster
// because the default parallelism of pipeline engine is higher than previous version.
// so set parallel_pipeline_task_num to parallel_fragment_exec_instance_num
int newVal = VariableMgr.newSessionVariable().parallelExecInstanceNum;
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.PARALLEL_PIPELINE_TASK_NUM,
String.valueOf(newVal));

// similar reason as above, need to upgrade broadcast scale factor during 1.2 to 2.x
// if the default value has been upgraded
double newBcFactorVal = VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor();
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR,
String.valueOf(newBcFactorVal));

// similar reason as above, need to upgrade enable_nereids_planner to true
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.ENABLE_NEREIDS_PLANNER,
"true");
}
if (journalVersion <= FeMetaVersion.VERSION_123) {
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", SessionVariable.ENABLE_NEREIDS_DML, "true");
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none");
if (VariableMgr.newSessionVariable().nereidsTimeoutSecond == 5) {
initLowerCaseTableNames();
// Set initial root password if master FE first time launch.
auth.setInitialRootPassword(Config.initial_root_password);
} else {
if (journalVersion <= FeMetaVersion.VERSION_114) {
// if journal version is less than 114, which means it is upgraded from version before 2.0.
// When upgrading from 1.2 to 2.0,
// we need to make sure that the parallelism of query remain unchanged
// when switch to pipeline engine, otherwise it may impact the load of entire cluster
// because the default parallelism of pipeline engine is higher than previous version.
// so set parallel_pipeline_task_num to parallel_fragment_exec_instance_num
int newVal = VariableMgr.newSessionVariable().parallelExecInstanceNum;
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
SessionVariable.PARALLEL_PIPELINE_TASK_NUM,
String.valueOf(newVal));

// similar reason as above, need to upgrade broadcast scale factor during 1.2 to 2.x
// if the default value has been upgraded
double newBcFactorVal = VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor();
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR,
String.valueOf(newBcFactorVal));

// similar reason as above, need to upgrade enable_nereids_planner to true
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.ENABLE_NEREIDS_PLANNER,
"true");
}
if (journalVersion <= FeMetaVersion.VERSION_123) {
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", SessionVariable.ENABLE_NEREIDS_DML,
"true");
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.NEREIDS_TIMEOUT_SECOND, "30");
SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none");
if (VariableMgr.newSessionVariable().nereidsTimeoutSecond == 5) {
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.NEREIDS_TIMEOUT_SECOND, "30");
}
}
}
}

getPolicyMgr().createDefaultStoragePolicy();
getPolicyMgr().createDefaultStoragePolicy();

// MUST set master ip before starting checkpoint thread.
// because checkpoint thread need this info to select non-master FE to push image
// MUST set master ip before starting checkpoint thread.
// because checkpoint thread need this info to select non-master FE to push image

toMasterProgress = "log master info";
this.masterInfo = new MasterInfo(Env.getCurrentEnv().getSelfNode().getHost(),
Config.http_port,
Config.rpc_port);
editLog.logMasterInfo(masterInfo);
LOG.info("logMasterInfo:{}", masterInfo);
toMasterProgress = "log master info";
this.masterInfo = new MasterInfo(Env.getCurrentEnv().getSelfNode().getHost(),
Config.http_port,
Config.rpc_port);
editLog.logMasterInfo(masterInfo);
LOG.info("logMasterInfo:{}", masterInfo);

// for master, the 'isReady' is set behind.
// but we are sure that all metadata is replayed if we get here.
// so no need to check 'isReady' flag in this method
postProcessAfterMetadataReplayed(false);
// for master, the 'isReady' is set behind.
// but we are sure that all metadata is replayed if we get here.
// so no need to check 'isReady' flag in this method
postProcessAfterMetadataReplayed(false);

insertOverwriteManager.allTaskFail();
insertOverwriteManager.allTaskFail();

toMasterProgress = "start daemon threads";
toMasterProgress = "start daemon threads";

// start all daemon threads that only running on MASTER FE
startMasterOnlyDaemonThreads();
// start other daemon threads that should running on all FE
startNonMasterDaemonThreads();
// start all daemon threads that only running on MASTER FE
startMasterOnlyDaemonThreads();
// start other daemon threads that should running on all FE
startNonMasterDaemonThreads();

MetricRepo.init();
MetricRepo.init();

toMasterProgress = "finished";
canRead.set(true);
isReady.set(true);
checkLowerCaseTableNames();
toMasterProgress = "finished";
canRead.set(true);
isReady.set(true);
checkLowerCaseTableNames();

String msg = "master finished to replay journal, can write now.";
Util.stdoutWithTime(msg);
LOG.info(msg);
// for master, there are some new thread pools need to register metric
ThreadPoolManager.registerAllThreadPoolMetric();
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
String msg = "master finished to replay journal, can write now.";
Util.stdoutWithTime(msg);
LOG.info(msg);
// for master, there are some new thread pools need to register metric
ThreadPoolManager.registerAllThreadPoolMetric();
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
}
} catch (Throwable e) {
// When failed to transfer to master, we need to exit the process.
// Otherwise, the process will be in an unknown state.
LOG.error("failed to transfer to master. progress: {}", toMasterProgress, e);
System.exit(-1);
}
}

Expand Down Expand Up @@ -1718,36 +1728,43 @@ private void startNonMasterDaemonThreads() {
private void transferToNonMaster(FrontendNodeType newType) {
isReady.set(false);

if (feType == FrontendNodeType.OBSERVER || feType == FrontendNodeType.FOLLOWER) {
Preconditions.checkState(newType == FrontendNodeType.UNKNOWN);
LOG.warn("{} to UNKNOWN, still offer read service", feType.name());
// not set canRead here, leave canRead as what is was.
// if meta out of date, canRead will be set to false in replayer thread.
metaReplayState.setTransferToUnknown();
return;
}
try {
if (feType == FrontendNodeType.OBSERVER || feType == FrontendNodeType.FOLLOWER) {
Preconditions.checkState(newType == FrontendNodeType.UNKNOWN);
LOG.warn("{} to UNKNOWN, still offer read service", feType.name());
// not set canRead here, leave canRead as what is was.
// if meta out of date, canRead will be set to false in replayer thread.
metaReplayState.setTransferToUnknown();
return;
}

// transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER
// transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER

if (replayer == null) {
createReplayer();
replayer.start();
}
if (replayer == null) {
createReplayer();
replayer.start();
}

// 'isReady' will be set to true in 'setCanRead()' method
if (!postProcessAfterMetadataReplayed(true)) {
// the state has changed, exit early.
return;
}
// 'isReady' will be set to true in 'setCanRead()' method
if (!postProcessAfterMetadataReplayed(true)) {
// the state has changed, exit early.
return;
}

checkLowerCaseTableNames();
checkLowerCaseTableNames();

startNonMasterDaemonThreads();
startNonMasterDaemonThreads();

MetricRepo.init();
MetricRepo.init();

if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
}
} catch (Throwable e) {
// When failed to transfer to non-master, we need to exit the process.
// Otherwise, the process will be in an unknown state.
LOG.error("failed to transfer to non-master.", e);
System.exit(-1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public FeServer(int port) {

public void start() throws IOException {
FrontendServiceImpl service = new FrontendServiceImpl(ExecuteEnv.getInstance());
Logger feServiceLogger = LogManager.getLogger(FrontendServiceImpl.class);
Logger feServiceLogger = LogManager.getLogger(FeServer.class);
FrontendService.Iface instance = (FrontendService.Iface) Proxy.newProxyInstance(
FrontendServiceImpl.class.getClassLoader(),
FrontendServiceImpl.class.getInterfaces(),
Expand Down
Loading