Skip to content

Commit

Permalink
4
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Dec 23, 2023
1 parent fc0ad82 commit 5ee0e28
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
16 changes: 6 additions & 10 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,13 +495,9 @@ bool LoadBlockQueue::is_wal_disk_space_enough(
size_t content_length = 0;
Status st = ExecEnv::GetInstance()->group_commit_mgr()->get_load_info(load_id, &content_length);
if (st.ok()) {
st = ExecEnv::GetInstance()->group_commit_mgr()->remove_load_info(load_id);
if (!st.ok()) {
LOG(WARNING) << "failed to remove load info!";
}
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->remove_load_info(load_id));
} else {
// for insert into
is_blocks_contain_all_load_data = true;
return Status::InternalError("can not find load id.");
}
size_t pre_allocated = is_blocks_contain_all_load_data
? blocks_size
Expand All @@ -528,19 +524,19 @@ bool LoadBlockQueue::is_wal_disk_space_enough(

Status GroupCommitMgr::update_load_info(TUniqueId load_id, size_t content_length) {
std::unique_lock l(_load_info_lock);
if (_load_id_to_content_length_map.find(load_id) != _load_id_to_content_length_map.end()) {
return Status::InternalError("load id already exists!");
if (_load_id_to_content_length_map.find(load_id) == _load_id_to_content_length_map.end()) {
_load_id_to_content_length_map.insert(std::make_pair(load_id, content_length));
}
_load_id_to_content_length_map.insert(std::make_pair(load_id, content_length));
return Status::OK();
}

Status GroupCommitMgr::get_load_info(TUniqueId load_id, size_t* content_length) {
std::shared_lock l(_load_info_lock);
if (_load_id_to_content_length_map.find(load_id) != _load_id_to_content_length_map.end()) {
*content_length = _load_id_to_content_length_map[load_id];
return Status::OK();
}
return Status::OK();
return Status::InternalError("can not find load id!");
}

Status GroupCommitMgr::remove_load_info(TUniqueId load_id) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/sink/group_commit_block_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <mutex>
#include <shared_mutex>

#include "common/exception.h"
#include "runtime/exec_env.h"
#include "runtime/group_commit_mgr.h"
#include "runtime/runtime_state.h"
Expand Down Expand Up @@ -68,6 +69,8 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) {

Status GroupCommitBlockSink::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::prepare(state));
RETURN_IF_ERROR(
ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(_load_id.to_thrift(), 0));
_state = state;

// profile must add to state's object pool
Expand Down

0 comments on commit 5ee0e28

Please sign in to comment.