diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 90b3dd6da0b1d0f..5c31abe5007238a 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -495,13 +495,9 @@ bool LoadBlockQueue::is_wal_disk_space_enough( size_t content_length = 0; Status st = ExecEnv::GetInstance()->group_commit_mgr()->get_load_info(load_id, &content_length); if (st.ok()) { - st = ExecEnv::GetInstance()->group_commit_mgr()->remove_load_info(load_id); - if (!st.ok()) { - LOG(WARNING) << "failed to remove load info!"; - } + RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->remove_load_info(load_id)); } else { - // for insert into - is_blocks_contain_all_load_data = true; + return Status::InternalError("can not find load id."); } size_t pre_allocated = is_blocks_contain_all_load_data ? blocks_size @@ -528,10 +524,9 @@ bool LoadBlockQueue::is_wal_disk_space_enough( Status GroupCommitMgr::update_load_info(TUniqueId load_id, size_t content_length) { std::unique_lock l(_load_info_lock); - if (_load_id_to_content_length_map.find(load_id) != _load_id_to_content_length_map.end()) { - return Status::InternalError("load id already exists!"); + if (_load_id_to_content_length_map.find(load_id) == _load_id_to_content_length_map.end()) { + _load_id_to_content_length_map.insert(std::make_pair(load_id, content_length)); } - _load_id_to_content_length_map.insert(std::make_pair(load_id, content_length)); return Status::OK(); } @@ -539,8 +534,9 @@ Status GroupCommitMgr::get_load_info(TUniqueId load_id, size_t* content_length) std::shared_lock l(_load_info_lock); if (_load_id_to_content_length_map.find(load_id) != _load_id_to_content_length_map.end()) { *content_length = _load_id_to_content_length_map[load_id]; + return Status::OK(); } - return Status::OK(); + return Status::InternalError("can not find load id!"); } Status GroupCommitMgr::remove_load_info(TUniqueId load_id) { diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index 80e8be7f9697a1e..bc8f8db0e21ac15 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -23,6 +23,7 @@ #include #include +#include "common/exception.h" #include "runtime/exec_env.h" #include "runtime/group_commit_mgr.h" #include "runtime/runtime_state.h" @@ -68,6 +69,8 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) { Status GroupCommitBlockSink::prepare(RuntimeState* state) { RETURN_IF_ERROR(DataSink::prepare(state)); + RETURN_IF_ERROR( + ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(_load_id.to_thrift(), 0)); _state = state; // profile must add to state's object pool