diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index fd5258e0c089e7..34160ab75d7fed 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -439,6 +439,13 @@ public class Config extends ConfigBase {
"Maximal waiting time for all publish version tasks of one transaction to be finished, in seconds."})
public static int publish_version_timeout_second = 30; // 30 seconds
+ @ConfField(mutable = true, masterOnly = true, description = {"导入 Publish 阶段的等待时间,单位是秒。超过此时间,"
+ + "则只需每个tablet包含一个成功副本,则导入成功。值为 -1 时,表示无限等待。",
+ "Waiting time for one transaction changing to \"at least one replica success\", in seconds."
+ + "If time exceeds this, and for each tablet it has at least one replica publish successful, "
+ + "then the load task will be successful." })
+ public static int publish_wait_time_second = 300;
+
@ConfField(mutable = true, masterOnly = true, description = {"提交事务的最大超时时间,单位是秒。"
+ "该参数仅用于事务型 insert 操作中。",
"Maximal waiting time for all data inserted before one transaction to be committed, in seconds. "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index c444cdf1db6d98..c76dc5e0ee3587 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -487,6 +487,32 @@ public String toString() {
return strBuffer.toString();
}
+ public String toStringSimple(boolean checkBeAlive) {
+ StringBuilder strBuffer = new StringBuilder("[replicaId=");
+ strBuffer.append(id);
+ strBuffer.append(", backendId=");
+ strBuffer.append(backendId);
+ if (checkBeAlive) {
+ strBuffer.append(", backendAlive=");
+ strBuffer.append(Env.getCurrentSystemInfo().checkBackendAlive(backendId));
+ }
+ strBuffer.append(", version=");
+ strBuffer.append(version);
+ if (lastFailedVersion > 0) {
+ strBuffer.append(", lastFailedVersion=");
+ strBuffer.append(lastFailedVersion);
+ strBuffer.append(", lastSuccessVersion=");
+ strBuffer.append(lastSuccessVersion);
+ strBuffer.append(", lastFailedTimestamp=");
+ strBuffer.append(lastFailedTimestamp);
+ }
+ strBuffer.append(", state=");
+ strBuffer.append(state.name());
+ strBuffer.append("]");
+
+ return strBuffer.toString();
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(id);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index dba5d24855beac..8397499ded3e27 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -81,7 +81,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -95,6 +94,12 @@
public class DatabaseTransactionMgr {
+ private enum PublishResult {
+ FAILED,
+ TIMEOUT_SUCC, // each tablet has least one replica succ, and timeout
+ QUORUM_SUCC, // each tablet has least quorum replicas succ
+ }
+
private static final Logger LOG = LogManager.getLogger(DatabaseTransactionMgr.class);
// the max number of txn that can be remove per round.
// set it to avoid holding lock too long when removing too many txns per round.
@@ -485,32 +490,9 @@ private void checkCommitStatus(List
tableList, TransactionState transacti
}
tabletToBackends.get(tabletId).add(tabletCommitInfos.get(i).getBackendId());
}
- List tabletSuccReplicas = Lists.newArrayList();
- List tabletWriteFailedReplicas = Lists.newArrayList();
- List tabletVersionFailedReplicas = Lists.newArrayList();
- Function getReplicaInfo = replica -> {
- StringBuilder strBuffer = new StringBuilder("[replicaId=");
- strBuffer.append(replica.getId());
- strBuffer.append(", backendId=");
- strBuffer.append(replica.getBackendId());
- strBuffer.append(", backendAlive=");
- strBuffer.append(Env.getCurrentSystemInfo().checkBackendAlive(replica.getBackendId()));
- strBuffer.append(", version=");
- strBuffer.append(replica.getVersion());
- if (replica.getLastFailedVersion() >= 0) {
- strBuffer.append(", lastFailedVersion=");
- strBuffer.append(replica.getLastFailedVersion());
- strBuffer.append(", lastSuccessVersion=");
- strBuffer.append(replica.getLastSuccessVersion());
- strBuffer.append(", lastFailedTimestamp=");
- strBuffer.append(replica.getLastFailedTimestamp());
- }
- strBuffer.append(", state=");
- strBuffer.append(replica.getState().name());
- strBuffer.append("]");
-
- return strBuffer.toString();
- };
+ List tabletSuccReplicas = Lists.newArrayList();
+ List tabletWriteFailedReplicas = Lists.newArrayList();
+ List tabletVersionFailedReplicas = Lists.newArrayList();
for (long tableId : tableToPartition.keySet()) {
OlapTable table = (OlapTable) db.getTableOrMetaException(tableId);
@@ -558,15 +540,12 @@ private void checkCommitStatus(List tableList, TransactionState transacti
tabletSuccReplicas.clear();
tabletWriteFailedReplicas.clear();
tabletVersionFailedReplicas.clear();
- int successReplicaNum = 0;
long tabletId = tablet.getId();
Set tabletBackends = tablet.getBackendIds();
totalInvolvedBackends.addAll(tabletBackends);
Set commitBackends = tabletToBackends.get(tabletId);
// save the error replica ids for current tablet
// this param is used for log
- Set errorBackendIdsForTablet = Sets.newHashSet();
- String errorReplicaInfo = new String();
for (long tabletBackend : tabletBackends) {
Replica replica = tabletInvertedIndex.getReplica(tabletId, tabletBackend);
if (replica == null) {
@@ -582,55 +561,28 @@ private void checkCommitStatus(List tableList, TransactionState transacti
// ignore it but not log it
// for example, a replica is in clone state
if (replica.getLastFailedVersion() < 0) {
- ++successReplicaNum;
- tabletSuccReplicas.add(getReplicaInfo.apply(replica));
+ tabletSuccReplicas.add(replica);
} else {
- errorReplicaInfo += " replica [" + replica.getId() + "], lastFailedVersion ["
- + replica.getLastFailedVersion() + "]";
- tabletVersionFailedReplicas.add(getReplicaInfo.apply(replica));
+ tabletVersionFailedReplicas.add(replica);
}
} else {
- tabletWriteFailedReplicas.add(getReplicaInfo.apply(replica));
- errorBackendIdsForTablet.add(tabletBackend);
+ tabletWriteFailedReplicas.add(replica);
errorReplicaIds.add(replica.getId());
- // not remove rollup task here, because the commit maybe failed
- // remove rollup task when commit successfully
- errorReplicaInfo += " replica [" + replica.getId() + "] commitBackends null or "
- + "tabletBackend [" + tabletBackend + "] does not "
- + "in commitBackends";
}
}
+ int successReplicaNum = tabletSuccReplicas.size();
if (successReplicaNum < quorumReplicaNum) {
- LOG.warn("Failed to commit txn [{}]. "
- + "Tablet [{}] success replica num is {} < quorum replica num {} "
- + "while error backends {} error replica info {} commitBackends {}",
- transactionState.getTransactionId(), tablet.getId(), successReplicaNum,
- quorumReplicaNum, Joiner.on(",").join(errorBackendIdsForTablet),
- errorReplicaInfo, commitBackends);
-
- String replicasDetailMsg = "";
- if (!tabletSuccReplicas.isEmpty()) {
- replicasDetailMsg += String.format("%s replicas final succ: { %s }; ",
- tabletSuccReplicas.size(), Joiner.on(", ").join(tabletSuccReplicas));
- }
- if (!tabletWriteFailedReplicas.isEmpty()) {
- replicasDetailMsg += String.format("%s replicas write data failed: { %s }; ",
- tabletWriteFailedReplicas.size(),
- Joiner.on(", ").join(tabletWriteFailedReplicas));
- }
- if (!tabletVersionFailedReplicas.isEmpty()) {
- replicasDetailMsg += String.format("%s replicas write data succ but miss previous "
- + "version: { %s }.",
- tabletVersionFailedReplicas.size(),
- Joiner.on(", ").join(tabletVersionFailedReplicas));
- }
+ String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
+ tabletVersionFailedReplicas);
+
+ String errMsg = String.format("Failed to commit txn %s, cause tablet %s succ replica "
+ + "num %s < quorum replica num %s. table %s, partition %s, this tablet detail: %s",
+ transactionId, tablet.getId(), successReplicaNum, quorumReplicaNum, tableId,
+ partition.getId(), writeDetail);
+ LOG.info(errMsg);
- throw new TabletQuorumFailedException(transactionId, String.format(
- "Failed to commit txn %s, cause tablet %s succ replica num %s < quorum "
- + " replica num %s. table %s, partition %s, this tablet detail: %s",
- transactionId, tablet.getId(), successReplicaNum, quorumReplicaNum, tableId,
- partition.getId(), replicasDetailMsg));
+ throw new TabletQuorumFailedException(transactionId, errMsg);
}
}
}
@@ -638,6 +590,32 @@ private void checkCommitStatus(List tableList, TransactionState transacti
}
}
+ private String getTabletWriteDetail(List tabletSuccReplicas, List tabletWriteFailedReplicas,
+ List tabletVersionFailedReplicas) {
+ String writeDetail = "";
+ if (!tabletSuccReplicas.isEmpty()) {
+ writeDetail += String.format("%s replicas final succ: { %s }; ",
+ tabletSuccReplicas.size(), Joiner.on(", ").join(
+ tabletSuccReplicas.stream().map(replica -> replica.toStringSimple(true))
+ .collect(Collectors.toList())));
+ }
+ if (!tabletWriteFailedReplicas.isEmpty()) {
+ writeDetail += String.format("%s replicas write data failed: { %s }; ",
+ tabletWriteFailedReplicas.size(), Joiner.on(", ").join(
+ tabletWriteFailedReplicas.stream().map(replica -> replica.toStringSimple(true))
+ .collect(Collectors.toList())));
+ }
+ if (!tabletVersionFailedReplicas.isEmpty()) {
+ writeDetail += String.format("%s replicas write data succ but miss previous "
+ + "version: { %s }.",
+ tabletVersionFailedReplicas.size(), Joiner.on(",").join(
+ tabletVersionFailedReplicas.stream().map(replica -> replica.toStringSimple(true))
+ .collect(Collectors.toList())));
+ }
+
+ return writeDetail;
+ }
+
/**
* commit transaction process as follows:
* 1. validate whether `Load` is cancelled
@@ -907,6 +885,18 @@ public void finishTransaction(long transactionId, Set errorReplicaIds) thr
errorReplicaIds.addAll(originalErrorReplicas);
}
+ long now = System.currentTimeMillis();
+ long firstPublishOneSuccTime = transactionState.getFirstPublishOneSuccTime();
+ boolean allowPublishOneSucc = false;
+ if (Config.publish_wait_time_second > 0 && firstPublishOneSuccTime > 0
+ && now >= firstPublishOneSuccTime + Config.publish_wait_time_second * 1000L) {
+ allowPublishOneSucc = true;
+ }
+
+ List tabletSuccReplicas = Lists.newArrayList();
+ List tabletWriteFailedReplicas = Lists.newArrayList();
+ List tabletVersionFailedReplicas = Lists.newArrayList();
+
// case 1 If database is dropped, then we just throw MetaNotFoundException, because all related tables are
// already force dropped, we just ignore the transaction with all tables been force dropped.
// case 2 If at least one table lock successfully, which means that the transaction should be finished for
@@ -920,8 +910,9 @@ public void finishTransaction(long transactionId, Set errorReplicaIds) thr
LOG.debug("finish transaction {} with tables {}", transactionId, tableIdList);
List extends TableIf> tableList = db.getTablesOnIdOrderIfExist(tableIdList);
tableList = MetaLockUtils.writeLockTablesIfExist(tableList);
+ PublishResult publishResult = PublishResult.QUORUM_SUCC;
try {
- boolean hasError = false;
+ boolean allTabletsLeastOneSucc = true;
Iterator tableCommitInfoIterator
= transactionState.getIdToTableCommitInfos().values().iterator();
while (tableCommitInfoIterator.hasNext()) {
@@ -985,48 +976,87 @@ public void finishTransaction(long transactionId, Set errorReplicaIds) thr
// Here we only check number, the replica version will be updated in updateCatalogAfterVisible()
for (MaterializedIndex index : allIndices) {
for (Tablet tablet : index.getTablets()) {
- int healthReplicaNum = 0;
+ tabletSuccReplicas.clear();
+ tabletWriteFailedReplicas.clear();
+ tabletVersionFailedReplicas.clear();
for (Replica replica : tablet.getReplicas()) {
if (!errorReplicaIds.contains(replica.getId())) {
if (replica.checkVersionCatchUp(partition.getVisibleVersion(), true)) {
- ++healthReplicaNum;
+ tabletSuccReplicas.add(replica);
} else {
- LOG.info("publish version {} failed for transaction {} on tablet {},"
- + " on replica {} due to not catchup",
- partitionCommitInfo.getVersion(), transactionState, tablet,
- replica);
+ tabletVersionFailedReplicas.add(replica);
}
} else if (replica.getVersion() >= partitionCommitInfo.getVersion()) {
// the replica's version is larger than or equal to current transaction
// partition's version the replica is normal, then remove it from error replica ids
// TODO(cmy): actually I have no idea why we need this check
+ tabletSuccReplicas.add(replica);
errorReplicaIds.remove(replica.getId());
- ++healthReplicaNum;
} else {
- LOG.info("publish version {} failed for transaction {} on tablet {},"
- + " on replica {} due to version hole or error",
- partitionCommitInfo.getVersion(), transactionState, tablet, replica);
+ tabletWriteFailedReplicas.add(replica);
+ }
+ }
+
+ int healthReplicaNum = tabletSuccReplicas.size();
+ if (healthReplicaNum >= quorumReplicaNum) {
+ if (!tabletWriteFailedReplicas.isEmpty() || !tabletVersionFailedReplicas.isEmpty()) {
+ String writeDetail = getTabletWriteDetail(tabletSuccReplicas,
+ tabletWriteFailedReplicas, tabletVersionFailedReplicas);
+ LOG.info("publish version quorum succ for transaction {} on tablet {} with version"
+ + " {}, and has failed replicas, quorum num {}. table {}, partition {},"
+ + " tablet detail: {}",
+ transactionState, tablet, partitionCommitInfo.getVersion(),
+ quorumReplicaNum, tableId, partitionId, writeDetail);
}
+ continue;
}
- if (healthReplicaNum < quorumReplicaNum) {
- LOG.info("publish version {} failed for transaction {} on tablet {},"
- + " with only {} replicas less than quorum {}",
- partitionCommitInfo.getVersion(), transactionState, tablet, healthReplicaNum,
- quorumReplicaNum);
+ if (healthReplicaNum == 0) {
+ allTabletsLeastOneSucc = false;
+ }
+
+ String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
+ tabletVersionFailedReplicas);
+ if (allowPublishOneSucc && healthReplicaNum > 0) {
+ if (publishResult == PublishResult.QUORUM_SUCC) {
+ publishResult = PublishResult.TIMEOUT_SUCC;
+ }
+ // We can not do any thing except retrying,
+ // because publish task is assigned a version,
+ // and doris does not permit discontinuous
+ // versions.
+ //
+ // If a timeout happens, it means that the rowset
+ // that are being publised exists on a few replicas we should go
+ // ahead, otherwise data may be lost and thre
+ // publish task hangs forever.
+ LOG.info("publish version timeout succ for transaction {} on tablet {} with version"
+ + " {}, and has failed replicas, quorum num {}. table {}, partition {},"
+ + " tablet detail: {}",
+ transactionState, tablet, partitionCommitInfo.getVersion(), quorumReplicaNum,
+ tableId, partitionId, writeDetail);
+ } else {
+ publishResult = PublishResult.FAILED;
String errMsg = String.format("publish on tablet %d failed."
+ " succeed replica num %d less than quorum %d."
+ " table: %d, partition: %d, publish version: %d",
tablet.getId(), healthReplicaNum, quorumReplicaNum, tableId,
partitionId, partition.getVisibleVersion() + 1);
transactionState.setErrorMsg(errMsg);
- hasError = true;
+ LOG.info("publish version failed for transaction {} on tablet {} with version"
+ + " {}, and has failed replicas, quorum num {}. table {}, partition {},"
+ + " tablet detail: {}",
+ transactionState, tablet, partitionCommitInfo.getVersion(), quorumReplicaNum,
+ tableId, partitionId, writeDetail);
}
}
}
}
}
- if (hasError) {
+ if (allTabletsLeastOneSucc && firstPublishOneSuccTime <= 0) {
+ transactionState.setFirstPublishOneSuccTime(now);
+ }
+ if (publishResult == PublishResult.FAILED) {
return;
}
boolean txnOperated = false;
@@ -1060,7 +1090,7 @@ public void finishTransaction(long transactionId, Set errorReplicaIds) thr
// Otherwise, there is no way for stream load to query the result right after loading finished,
// even if we call "sync" before querying.
transactionState.countdownVisibleLatch();
- LOG.info("finish transaction {} successfully", transactionState);
+ LOG.info("finish transaction {} successfully, publish result: {}", transactionState, publishResult.name());
}
protected void unprotectedPreCommitTransaction2PC(TransactionState transactionState, Set errorReplicaIds,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index b15329e6b0dfec..897bc3b63b8ff7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -219,8 +219,14 @@ public String toString() {
private long publishVersionTime = -1;
private TransactionStatus preStatus = null;
+ // When publish txn, if every tablet has at least 1 replica published succ, but not quorum replicas succ,
+ // and time since firstPublishOneSuccTime has exceeds Config.publish_wait_time_second,
+ // then this transaction will become visible.
+ private long firstPublishOneSuccTime = -1;
+
@SerializedName(value = "callbackId")
private long callbackId = -1;
+
// In the beforeStateTransform() phase, we will get the callback object through the callbackId,
// and if we get it, we will save it in this variable.
// The main function of this variable is to retain a reference to this callback object.
@@ -387,6 +393,14 @@ public String getErrorLogUrl() {
return errorLogUrl;
}
+ public long getFirstPublishOneSuccTime() {
+ return firstPublishOneSuccTime;
+ }
+
+ public void setFirstPublishOneSuccTime(long firstPublishOneSuccTime) {
+ this.firstPublishOneSuccTime = firstPublishOneSuccTime;
+ }
+
public void setTransactionStatus(TransactionStatus transactionStatus) {
// status changed
this.preStatus = this.transactionStatus;