Skip to content

Commit

Permalink
[Enhancement](group commit) Add fault injection case for group commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian authored Apr 19, 2024
1 parent b9952bc commit 198496b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 2 deletions.
11 changes: 10 additions & 1 deletion be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
RuntimeState* state) {
Status st;
Status result_status;
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status",
{ status = Status::InternalError(""); });
if (status.ok()) {
// commit txn
TLoadTxnCommitRequest request;
Expand Down Expand Up @@ -368,6 +370,13 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
},
10000L);
result_status = Status::create<false>(result.status);
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", {
std ::string msg = "abort txn";
LOG(INFO) << "debug promise set: " << msg;
ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value(
Status ::InternalError(msg));
return status;
});
}
std::shared_ptr<LoadBlockQueue> load_block_queue;
{
Expand All @@ -392,7 +401,7 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
// status: exec_plan_fragment result
// st: commit txn rpc status
// result_status: commit txn result
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status",
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_st",
{ st = Status::InternalError(""); });
if (status.ok() && st.ok() &&
(result_status.ok() || result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
exception = false;
try {
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status")
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_st")
streamLoad {
table "${tableName}"
set 'column_separator', ','
Expand All @@ -88,6 +88,44 @@ suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains('estimated wal bytes 0 Bytes'))
exception = true;
} finally {
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_st")
assertTrue(exception)
}

// test group commit abort txn
sql """ DROP TABLE IF EXISTS ${tableName} """

sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`k` int ,
`v` int ,
) engine=olap
DISTRIBUTED BY HASH(`k`)
BUCKETS 5
properties("replication_num" = "1")
"""

GetDebugPoint().clearDebugPointsForAllBEs()

exception = false;
try {
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status")
streamLoad {
table "${tableName}"
set 'column_separator', ','
set 'group_commit', 'async_mode'
unset 'label'
file 'group_commit_wal_msg.csv'
time 10000
}
assertFalse(true);
} catch (Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains('abort txn'))
exception = true;
} finally {
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status")
Expand Down

0 comments on commit 198496b

Please sign in to comment.