From be464d2888b7a1432b973abc6938cc730476ee9b Mon Sep 17 00:00:00 2001 From: yujun Date: Sun, 14 Jan 2024 10:27:35 +0800 Subject: [PATCH] update --- .../olap/task/engine_publish_version_task.cpp | 11 +++-- be/src/olap/txn_manager.cpp | 41 +++++++++++++++---- .../transaction/DatabaseTransactionMgr.java | 35 +++++++++++++--- 3 files changed, 69 insertions(+), 18 deletions(-) diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 748a6e9b52d761..e9c91efb6b6947 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -94,13 +94,16 @@ Status EnginePublishVersionTask::finish() { VLOG_NOTICE << "begin to process publish version. transaction_id=" << transaction_id; DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.random", { if (rand() % 100 < (100 * dp->param("percent", 0.5))) { - LOG_WARNING("EnginePublishVersionTask.finish.random random failed"); + LOG_WARNING("EnginePublishVersionTask.finish.random random failed") + .tag("txn_id", transaction_id); return Status::InternalError("debug engine publish version task random failed"); } }); DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.wait", { if (auto wait = dp->param("duration", 0); wait > 0) { - LOG_WARNING("EnginePublishVersionTask.finish.wait wait").tag("wait ms", wait); + LOG_WARNING("EnginePublishVersionTask.finish.wait wait") + .tag("txn_id", transaction_id) + .tag("wait ms", wait); std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } }); @@ -199,7 +202,9 @@ Status EnginePublishVersionTask::finish() { partition_id, tablet_info.tablet_id, version.first); } res = Status::Error( - "check_version_exist failed"); + "version not continuous for mow, tablet_id={}, " + "tablet_max_version={}, txn_version={}", + tablet_info.tablet_id, max_version, version.first); int64_t missed_version = max_version + 1; int64_t missed_txn_id = StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version( diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index be52c6c7d4480f..71f4e22e075e4c 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -103,13 +103,18 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transac DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", { if (rand() % 100 < (100 * dp->param("percent", 0.5))) { - LOG_WARNING("TxnManager.prepare_txn.random_failed random failed"); + LOG_WARNING("TxnManager.prepare_txn.random_failed random failed") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id); return Status::InternalError("debug prepare txn random failed"); } }); DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", { if (auto wait = dp->param("duration", 0); wait > 0) { - LOG_WARNING("TxnManager.prepare_txn.wait").tag("wait ms", wait); + LOG_WARNING("TxnManager.prepare_txn.wait") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id) + .tag("wait ms", wait); std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } }); @@ -274,13 +279,18 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", { if (rand() % 100 < (100 * dp->param("percent", 0.5))) { - LOG_WARNING("TxnManager.commit_txn.random_failed"); + LOG_WARNING("TxnManager.commit_txn.random_failed") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id); return Status::InternalError("debug commit txn random failed"); } }); DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", { if (auto wait = dp->param("duration", 0); wait > 0) { - LOG_WARNING("TxnManager.commit_txn.wait").tag("wait ms", wait); + LOG_WARNING("TxnManager.commit_txn.wait") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id) + .tag("wait ms", wait); std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } }); @@ -347,7 +357,10 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, rowset_ptr->rowset_meta()->get_rowset_pb(), false); DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", { if (auto wait = dp->param("duration", 0); wait > 0) { - LOG_WARNING("TxnManager.RowsetMetaManager.save_wait").tag("wait ms", wait); + LOG_WARNING("TxnManager.RowsetMetaManager.save_wait") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id) + .tag("wait ms", wait); std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } }); @@ -426,13 +439,18 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, } DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", { if (rand() % 100 < (100 * dp->param("percent", 0.5))) { - LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta"); + LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id); return Status::InternalError("debug publish txn before save rs meta random failed"); } }); DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", { if (auto wait = dp->param("duration", 0); wait > 0) { - LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta").tag("wait ms", wait); + LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id) + .tag("wait ms", wait); std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } }); @@ -446,13 +464,18 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta", { if (rand() % 100 < (100 * dp->param("percent", 0.5))) { - LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta"); + LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id); return Status::InternalError("debug publish txn after save rs meta random failed"); } }); DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", { if (auto wait = dp->param("duration", 0); wait > 0) { - LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta").tag("wait ms", wait); + LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta") + .tag("txn_id", transaction_id) + .tag("tablet_id", tablet_id) + .tag("wait ms", wait); std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } }); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 8c9000cee7fd62..6044cec35f53e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -581,10 +581,12 @@ private void checkCommitStatus(List tableList, TransactionState transacti String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas, tabletVersionFailedReplicas); - String errMsg = String.format("Failed to commit txn %s, cause tablet %s succ replica " - + "num %s < quorum replica num %s. table %s, partition %s, this tablet detail: %s", + String errMsg = String.format("Failed to commit txn %s, cause tablet %s succ replica num %s" + + " < load required replica num %s. table %s, partition: [ id=%s, commit version %s" + + ", visible version %s ], this tablet detail: %s", transactionId, tablet.getId(), successReplicaNum, quorumReplicaNum, tableId, - partition.getId(), writeDetail); + partition.getId(), partition.getCommittedVersion(), partition.getVisibleVersion(), + writeDetail); LOG.info(errMsg); throw new TabletQuorumFailedException(transactionId, errMsg); @@ -739,7 +741,7 @@ public void commitTransaction(List
tableList, long transactionId, List errorReplicaIds = transactionState.getErrorReplicas(); + List tabletSuccReplicas = Lists.newArrayList(); + List tabletFailedReplicas = Lists.newArrayList(); for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { long tableId = tableCommitInfo.getTableId(); OlapTable table = (OlapTable) db.getTableNullable(tableId); @@ -1747,13 +1751,32 @@ private void updateCatalogAfterCommitted(TransactionState transactionState, Data for (MaterializedIndex index : allIndices) { List tablets = index.getTablets(); for (Tablet tablet : tablets) { + tabletFailedReplicas.clear(); + tabletSuccReplicas.clear(); for (Replica replica : tablet.getReplicas()) { if (errorReplicaIds.contains(replica.getId())) { // TODO(cmy): do we need to update last failed version here? // because in updateCatalogAfterVisible, it will be updated again. replica.updateLastFailedVersion(partitionCommitInfo.getVersion()); + tabletFailedReplicas.add(replica); + } else { + tabletSuccReplicas.add(replica); } } + if (!isReplay && !tabletFailedReplicas.isEmpty()) { + LOG.info("some replicas load data failed for committed txn {} on version {}, table {}, " + + "partition {}, tablet {}, {} replicas load data succ: {}, {} replicas load " + + "data fail: {}", + transactionState.getTransactionId(), partitionCommitInfo.getVersion(), + tableId, partitionId, tablet.getId(), tabletSuccReplicas.size(), + Joiner.on(", ").join(tabletSuccReplicas.stream() + .map(replica -> replica.toStringSimple(true)) + .collect(Collectors.toList())), + tabletFailedReplicas.size(), + Joiner.on(", ").join(tabletFailedReplicas.stream() + .map(replica -> replica.toStringSimple(true)) + .collect(Collectors.toList()))); + } } } partition.setNextVersion(partition.getNextVersion() + 1); @@ -1967,7 +1990,7 @@ public void replayUpsertTransactionState(TransactionState transactionState) thro transactionState.replaySetTransactionStatus(); if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) { LOG.info("replay a committed transaction {}", transactionState); - updateCatalogAfterCommitted(transactionState, db); + updateCatalogAfterCommitted(transactionState, db, true); } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { LOG.info("replay a visible transaction {}", transactionState); updateCatalogAfterVisible(transactionState, db);