Skip to content

Commit

Permalink
[branch-2.0](txn) be dead exceeds 5min abort its txns (#22781, #28662,
Browse files Browse the repository at this point in the history
…#35342) (#39317)

cherry-pick:  #22781,  #28662, #35342

---------

Co-authored-by: HHoflittlefish777 <[email protected]>
  • Loading branch information
yujun777 and sollhui authored Aug 14, 2024
1 parent adfef1d commit eaffb69
Show file tree
Hide file tree
Showing 33 changed files with 349 additions and 54 deletions.
4 changes: 4 additions & 0 deletions be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
#include "thrift/protocol/TDebugProtocol.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/thrift_rpc_helper.h"
#include "util/time.h"
Expand Down Expand Up @@ -242,6 +243,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
request.__set_timeout(ctx->timeout_second);
}
request.__set_request_id(ctx->id.to_thrift());
request.__set_backend_id(_exec_env->master_info()->backend_id);

TLoadTxnBeginResult result;
Status status;
Expand Down Expand Up @@ -374,6 +376,8 @@ void StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx,
}

Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.block", DBUG_BLOCK);

DorisMetrics::instance()->stream_load_txn_commit_request_total->increment(1);

TLoadTxnCommitRequest request;
Expand Down
10 changes: 10 additions & 0 deletions docs/en/docs/admin-manual/config/fe-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,16 @@ Is it possible to configure dynamically: true

Whether it is a configuration item unique to the Master FE node: true

### `abort_txn_after_lost_heartbeat_time_second`

Abort transaction time after lost heartbeat. The default value is 300, which means transactions of be will be aborted after lost heartbeat 300s.

Default: 300(s)

Is it possible to configure dynamically: true

Whether it is a configuration item unique to the Master FE node: true

#### `enable_access_file_without_broker`

Default:false
Expand Down
10 changes: 10 additions & 0 deletions docs/zh-CN/docs/admin-manual/config/fe-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,16 @@ FE向BE的BackendService发送rpc请求时的超时时间,单位:毫秒。

是否为 Master FE 节点独有的配置项:true

#### `abort_txn_after_lost_heartbeat_time_second`

丢失be心跳后丢弃be事务的时间。默认时间为三百秒,当三百秒fe没有接收到be心跳时,会丢弃该be的所有事务。

默认值:300(秒)

是否可以动态配置:true

是否为 Master FE 节点独有的配置项:true

#### `enable_access_file_without_broker`

默认值:false
Expand Down
14 changes: 14 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1812,6 +1812,20 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static long max_backend_heartbeat_failure_tolerance_count = 1;

/**
* Abort transaction time after lost heartbeat.
* The default value is 300s, which means transactions of be will be aborted after lost heartbeat 300s.
*/
@ConfField(mutable = true, masterOnly = true)
public static int abort_txn_after_lost_heartbeat_time_second = 300;

/**
* Heartbeat interval in seconds.
* Default is 5, which means every 5 seconds, the master will send a heartbeat to all backends.
*/
@ConfField(mutable = false, masterOnly = false)
public static int heartbeat_interval_second = 5;

/**
* The iceberg and hudi table will be removed in v1.3
* Use multi catalog instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.doris.planner.external.jdbc.JdbcTableSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TUniqueId;
Expand Down Expand Up @@ -358,7 +359,9 @@ public void analyze(Analyzer analyzer) throws UserException {
LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING;
transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
Lists.newArrayList(targetTable.getId()), label.getLabelName(),
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
}
isTransactionBegin = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

public class ClientPool {
static GenericKeyedObjectPoolConfig heartbeatConfig = new GenericKeyedObjectPoolConfig();
static int heartbeatTimeoutMs = FeConstants.heartbeat_interval_second * 1000;
static int heartbeatTimeoutMs = Config.heartbeat_interval_second * 1000;

static GenericKeyedObjectPoolConfig backendConfig = new GenericKeyedObjectPoolConfig();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public class FeConstants {
public static int shortkey_max_column_count = 3;
public static int shortkey_maxsize_bytes = 36;

public static int heartbeat_interval_second = 5;
public static int checkpoint_interval_second = 60; // 1 minutes

// dpp version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
Expand Down Expand Up @@ -229,6 +230,11 @@ private Object executeStreamLoad2PC(HttpServletRequest request, String db) {
}

private TNetworkAddress selectRedirectBackend(String clusterName) throws LoadException {
long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L);
if (debugBackendId != -1L) {
Backend backend = Env.getCurrentSystemInfo().getBackend(debugBackendId);
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}
String qualifiedUser = ConnectContext.get().getQualifiedUser();
Set<Tag> userTags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.QueryStateException;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
Expand Down Expand Up @@ -245,7 +246,9 @@ public void process(DeleteStmt stmt) throws DdlException, QueryStateException {
// begin txn here and generate txn id
transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
Lists.newArrayList(olapTable.getId()), label, null,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.FRONTEND, jobId, Config.stream_load_default_timeout_second);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
Expand Down Expand Up @@ -104,7 +105,9 @@ public void beginTxn()
QuotaExceedException, MetaNotFoundException {
transactionId = Env.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id,
getTimeout());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.sparkdpp.DppResult;
import org.apache.doris.sparkdpp.EtlJobConfig;
Expand Down Expand Up @@ -198,7 +199,9 @@ public void beginTxn()
QuotaExceedException, MetaNotFoundException {
transactionId = Env.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
LoadJobSourceType.FRONTEND, id, getTimeout());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.transaction.BeginTransactionException;
Expand Down Expand Up @@ -206,7 +207,9 @@ public boolean beginTxn() throws UserException {
try {
txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(routineLoadJob.getDbId(),
Lists.newArrayList(routineLoadJob.getTableId()), DebugUtil.printId(id), null,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob.getId(),
timeoutMs / 1000);
} catch (DuplicatedRequestException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.load.sync.model.Data;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.SyncTask;
import org.apache.doris.task.SyncTaskPool;
Expand Down Expand Up @@ -133,8 +134,10 @@ public void beginTxn(long batchId) throws UserException, TException, TimeoutExce
try {
long txnId = globalTransactionMgr.beginTransaction(db.getId(),
Lists.newArrayList(tbl.getId()), label,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
request = new TStreamLoadPutRequest()
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TQueryType;
Expand Down Expand Up @@ -80,7 +81,9 @@ public Transaction(ConnectContext ctx, Database database, Table table, String la
this.coordinator = new Coordinator(ctx, null, planner, ctx.getStatsErrorEstimator());
this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), ImmutableList.of(table.getId()), labelName,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout());
this.createAt = System.currentTimeMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Reference;
import org.apache.doris.common.UserException;
Expand Down Expand Up @@ -176,7 +175,7 @@ public static void addToBlacklist(Long backendID, String reason) {
return;
}

blacklistBackends.put(backendID, Pair.of(FeConstants.heartbeat_interval_second + 1, reason));
blacklistBackends.put(backendID, Pair.of(Config.heartbeat_interval_second + 1, reason));
LOG.warn("add backend {} to black list. reason: {}", backendID, reason);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.InternalQueryBuffer;
Expand Down Expand Up @@ -1908,9 +1909,10 @@ private void beginTxn(String dbName, String tblName) throws UserException, TExce
String label = txnEntry.getLabel();
if (Env.getCurrentEnv().isMaster()) {
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
label, new TransactionState.TxnCoordinator(
TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), label,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
txnConf.setTxnId(txnId);
String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ public class ExecuteEnv {
private static volatile ExecuteEnv INSTANCE;
private MultiLoadMgr multiLoadMgr;
private ConnectScheduler scheduler;
private long startupTime;

private ExecuteEnv() {
multiLoadMgr = new MultiLoadMgr();
scheduler = new ConnectScheduler(Config.qe_max_connection);
startupTime = System.currentTimeMillis();
}

public static ExecuteEnv getInstance() {
Expand All @@ -50,4 +52,9 @@ public ConnectScheduler getScheduler() {
public MultiLoadMgr getMultiLoadMgr() {
return multiLoadMgr;
}

public long getStartupTime() {
return startupTime;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1249,10 +1249,12 @@ private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request, Strin
OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl, TableType.OLAP);
// begin
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;
Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId());
long startTime = backend != null ? backend.getLastStartTime() : 0;
TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime);
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
db.getId(), Lists.newArrayList(table.getId()), request.getLabel(), request.getRequestId(),
new TxnCoordinator(TxnSourceType.BE, clientIp),
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);
txnCoord, TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);
TLoadTxnBeginResult result = new TLoadTxnBeginResult();
result.setTxnId(txnId).setDbId(db.getId());
return result;
Expand Down Expand Up @@ -1356,10 +1358,12 @@ private TBeginTxnResult beginTxnImpl(TBeginTxnRequest request, String clientIp)
// step 5: get timeout
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;

Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId());
long startTime = backend != null ? backend.getLastStartTime() : 0;
TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime);
// step 6: begin transaction
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
db.getId(), tableIdList, request.getLabel(), request.getRequestId(),
new TxnCoordinator(TxnSourceType.BE, clientIp),
db.getId(), tableIdList, request.getLabel(), request.getRequestId(), txnCoord,
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);

// step 7: return result
Expand Down
18 changes: 13 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class HeartbeatMgr extends MasterDaemon {
private static volatile AtomicReference<TMasterInfo> masterInfo = new AtomicReference<>();

public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric) {
super("heartbeat mgr", FeConstants.heartbeat_interval_second * 1000);
super("heartbeat mgr", Config.heartbeat_interval_second * 1000);
this.nodeMgr = nodeMgr;
this.executor = ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num,
Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool", needRegisterMetric);
Expand Down Expand Up @@ -168,13 +168,21 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) {
BackendHbResponse hbResponse = (BackendHbResponse) response;
Backend be = nodeMgr.getBackend(hbResponse.getBeId());
if (be != null) {
long oldStartTime = be.getLastStartTime();
boolean isChanged = be.handleHbResponse(hbResponse, isReplay);
if (hbResponse.getStatus() != HbStatus.OK) {
if (hbResponse.getStatus() == HbStatus.OK) {
long newStartTime = be.getLastStartTime();
if (!isReplay && oldStartTime != newStartTime) {
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart(
be.getId(), be.getHost(), newStartTime);
}
} else {
// invalid all connections cached in ClientPool
ClientPool.backendPool.clearPool(new TNetworkAddress(be.getHost(), be.getBePort()));
if (!isReplay) {
Env.getCurrentEnv().getGlobalTransactionMgr()
.abortTxnWhenCoordinateBeDown(be.getHost(), 100);
if (!isReplay && System.currentTimeMillis() - be.getLastUpdateMs()
>= Config.abort_txn_after_lost_heartbeat_time_second * 1000L) {
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeDown(
be.getId(), be.getHost(), 100);
}
}
return isChanged;
Expand Down
Loading

0 comments on commit eaffb69

Please sign in to comment.