Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
yujun777 committed May 20, 2024
1 parent 7bee558 commit be464d2
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 18 deletions.
11 changes: 8 additions & 3 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,16 @@ Status EnginePublishVersionTask::finish() {
VLOG_NOTICE << "begin to process publish version. transaction_id=" << transaction_id;
DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.random", {
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
LOG_WARNING("EnginePublishVersionTask.finish.random random failed");
LOG_WARNING("EnginePublishVersionTask.finish.random random failed")
.tag("txn_id", transaction_id);
return Status::InternalError("debug engine publish version task random failed");
}
});
DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.wait", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
LOG_WARNING("EnginePublishVersionTask.finish.wait wait").tag("wait ms", wait);
LOG_WARNING("EnginePublishVersionTask.finish.wait wait")
.tag("txn_id", transaction_id)
.tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
Expand Down Expand Up @@ -199,7 +202,9 @@ Status EnginePublishVersionTask::finish() {
partition_id, tablet_info.tablet_id, version.first);
}
res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>(
"check_version_exist failed");
"version not continuous for mow, tablet_id={}, "
"tablet_max_version={}, txn_version={}",
tablet_info.tablet_id, max_version, version.first);
int64_t missed_version = max_version + 1;
int64_t missed_txn_id =
StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version(
Expand Down
41 changes: 32 additions & 9 deletions be/src/olap/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,18 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transac

DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", {
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
LOG_WARNING("TxnManager.prepare_txn.random_failed random failed");
LOG_WARNING("TxnManager.prepare_txn.random_failed random failed")
.tag("txn_id", transaction_id)
.tag("tablet_id", tablet_id);
return Status::InternalError("debug prepare txn random failed");
}
});
DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
LOG_WARNING("TxnManager.prepare_txn.wait").tag("wait ms", wait);
LOG_WARNING("TxnManager.prepare_txn.wait")
.tag("txn_id", transaction_id)
.tag("tablet_id", tablet_id)
.tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
Expand Down Expand Up @@ -274,13 +279,18 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,

DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", {
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
LOG_WARNING("TxnManager.commit_txn.random_failed");
LOG_WARNING("TxnManager.commit_txn.random_failed")
.tag("txn_id", transaction_id)
.tag("tablet_id", tablet_id);
return Status::InternalError("debug commit txn random failed");
}
});
DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
LOG_WARNING("TxnManager.commit_txn.wait").tag("wait ms", wait);
LOG_WARNING("TxnManager.commit_txn.wait")
.tag("txn_id", transaction_id)
.tag("tablet_id", tablet_id)
.tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
Expand Down Expand Up @@ -347,7 +357,10 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
rowset_ptr->rowset_meta()->get_rowset_pb(), false);
DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
LOG_WARNING("TxnManager.RowsetMetaManager.save_wait").tag("wait ms", wait);
LOG_WARNING("TxnManager.RowsetMetaManager.save_wait")
.tag("txn_id", transaction_id)
.tag("tablet_id", tablet_id)
.tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
Expand Down Expand Up @@ -426,13 +439,18 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
}
DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", {
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta");
LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta")
.tag("txn_id", transaction_id)
.tag("tablet_id", tablet_id);
return Status::InternalError("debug publish txn before save rs meta random failed");
}
});
DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta").tag("wait ms", wait);
LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta")
.tag("txn_id", transaction_id)
.tag("tablet_id", tablet_id)
.tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
Expand All @@ -446,13 +464,18 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,

DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta", {
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta");
LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta")
.tag("txn_id", transaction_id)
.tag("tablet_id", tablet_id);
return Status::InternalError("debug publish txn after save rs meta random failed");
}
});
DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta").tag("wait ms", wait);
LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta")
.tag("txn_id", transaction_id)
.tag("tablet_id", tablet_id)
.tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,10 +581,12 @@ private void checkCommitStatus(List<Table> tableList, TransactionState transacti
String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
tabletVersionFailedReplicas);

String errMsg = String.format("Failed to commit txn %s, cause tablet %s succ replica "
+ "num %s < quorum replica num %s. table %s, partition %s, this tablet detail: %s",
String errMsg = String.format("Failed to commit txn %s, cause tablet %s succ replica num %s"
+ " < load required replica num %s. table %s, partition: [ id=%s, commit version %s"
+ ", visible version %s ], this tablet detail: %s",
transactionId, tablet.getId(), successReplicaNum, quorumReplicaNum, tableId,
partition.getId(), writeDetail);
partition.getId(), partition.getCommittedVersion(), partition.getVisibleVersion(),
writeDetail);
LOG.info(errMsg);

throw new TabletQuorumFailedException(transactionId, errMsg);
Expand Down Expand Up @@ -739,7 +741,7 @@ public void commitTransaction(List<Table> tableList, long transactionId, List<Ta
}

// update nextVersion because of the failure of persistent transaction resulting in error version
updateCatalogAfterCommitted(transactionState, db);
updateCatalogAfterCommitted(transactionState, db, false);
LOG.info("transaction:[{}] successfully committed", transactionState);
}

Expand Down Expand Up @@ -1723,8 +1725,10 @@ protected void checkRunningTxnExceedLimit(TransactionState.LoadJobSourceType sou
}
}

private void updateCatalogAfterCommitted(TransactionState transactionState, Database db) {
private void updateCatalogAfterCommitted(TransactionState transactionState, Database db, boolean isReplay) {
Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
List<Replica> tabletSuccReplicas = Lists.newArrayList();
List<Replica> tabletFailedReplicas = Lists.newArrayList();
for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) {
long tableId = tableCommitInfo.getTableId();
OlapTable table = (OlapTable) db.getTableNullable(tableId);
Expand All @@ -1747,13 +1751,32 @@ private void updateCatalogAfterCommitted(TransactionState transactionState, Data
for (MaterializedIndex index : allIndices) {
List<Tablet> tablets = index.getTablets();
for (Tablet tablet : tablets) {
tabletFailedReplicas.clear();
tabletSuccReplicas.clear();
for (Replica replica : tablet.getReplicas()) {
if (errorReplicaIds.contains(replica.getId())) {
// TODO(cmy): do we need to update last failed version here?
// because in updateCatalogAfterVisible, it will be updated again.
replica.updateLastFailedVersion(partitionCommitInfo.getVersion());
tabletFailedReplicas.add(replica);
} else {
tabletSuccReplicas.add(replica);
}
}
if (!isReplay && !tabletFailedReplicas.isEmpty()) {
LOG.info("some replicas load data failed for committed txn {} on version {}, table {}, "
+ "partition {}, tablet {}, {} replicas load data succ: {}, {} replicas load "
+ "data fail: {}",
transactionState.getTransactionId(), partitionCommitInfo.getVersion(),
tableId, partitionId, tablet.getId(), tabletSuccReplicas.size(),
Joiner.on(", ").join(tabletSuccReplicas.stream()
.map(replica -> replica.toStringSimple(true))
.collect(Collectors.toList())),
tabletFailedReplicas.size(),
Joiner.on(", ").join(tabletFailedReplicas.stream()
.map(replica -> replica.toStringSimple(true))
.collect(Collectors.toList())));
}
}
}
partition.setNextVersion(partition.getNextVersion() + 1);
Expand Down Expand Up @@ -1967,7 +1990,7 @@ public void replayUpsertTransactionState(TransactionState transactionState) thro
transactionState.replaySetTransactionStatus();
if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
LOG.info("replay a committed transaction {}", transactionState);
updateCatalogAfterCommitted(transactionState, db);
updateCatalogAfterCommitted(transactionState, db, true);
} else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
LOG.info("replay a visible transaction {}", transactionState);
updateCatalogAfterVisible(transactionState, db);
Expand Down

0 comments on commit be464d2

Please sign in to comment.