Skip to content

Commit

Permalink
[refactor](loadmemlimit) remove load memlimit since it is never used (a…
Browse files Browse the repository at this point in the history
…pache#39536)

It is needed to pick to branch 21, because we will depend on it to do
spill disk.

Co-authored-by: yiguolei <[email protected]>
  • Loading branch information
yiguolei and Doris-Extras committed Sep 9, 2024
1 parent 4b72a4c commit cacef8d
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 23 deletions.
29 changes: 23 additions & 6 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,13 +463,30 @@ Status RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
return Status::OK();
}

int64_t RuntimeState::get_load_mem_limit() {
// TODO: the code is abandoned, it can be deleted after v1.3
if (_query_options.__isset.load_mem_limit && _query_options.load_mem_limit > 0) {
return _query_options.load_mem_limit;
} else {
return _query_mem_tracker->limit();
std::string RuntimeState::get_error_log_file_path() {
if (_s3_error_fs && _error_log_file && _error_log_file->is_open()) {
// close error log file
_error_log_file->close();
std::string error_log_absolute_path =
_exec_env->load_path_mgr()->get_load_error_absolute_path(_error_log_file_path);
// upload error log file to s3
Status st = _s3_error_fs->upload(error_log_absolute_path, _s3_error_log_file_path);
if (st.ok()) {
// remove local error log file
std::filesystem::remove(error_log_absolute_path);
} else {
// upload failed and return local error log file path
LOG(WARNING) << "Fail to upload error file to s3, error_log_file_path="
<< _error_log_file_path << ", error=" << st;
return _error_log_file_path;
}
// expiration must be less than a week (in seconds) for presigned url
static const unsigned EXPIRATION_SECONDS = 7 * 24 * 60 * 60 - 1;
// We should return a public endpoint to user.
_error_log_file_path = _s3_error_fs->generate_presigned_url(_s3_error_log_file_path,
EXPIRATION_SECONDS, true);
}
return _error_log_file_path;
}

void RuntimeState::resize_op_id_to_local_state(int operator_size) {
Expand Down
4 changes: 0 additions & 4 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,6 @@ class RuntimeState {

std::vector<TErrorTabletInfo>& error_tablet_infos() { return _error_tablet_infos; }

// get mem limit for load channel
// if load mem limit is not set, or is zero, using query mem limit instead.
int64_t get_load_mem_limit();

// local runtime filter mgr, the runtime filter do not have remote target or
// not need local merge should regist here. the instance exec finish, the local
// runtime filter mgr can release the memory of local runtime filter
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ void VNodeChannel::_open_internal(bool is_incremental) {

request->set_num_senders(_parent->_num_senders);
request->set_need_gen_rollup(false); // Useless but it is a required field in pb
request->set_load_mem_limit(_parent->_load_mem_limit);
request->set_load_channel_timeout_s(_parent->_load_channel_timeout_s);
request->set_is_high_priority(_parent->_is_high_priority);
request->set_sender_ip(BackendOptions::get_localhost());
Expand Down Expand Up @@ -1226,7 +1225,6 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
_max_wait_exec_timer = ADD_TIMER(profile, "MaxWaitExecTime");
_add_batch_number = ADD_COUNTER(profile, "NumberBatchAdded", TUnit::UNIT);
_num_node_channels = ADD_COUNTER(profile, "NumberNodeChannels", TUnit::UNIT);
_load_mem_limit = state->get_load_mem_limit();

#ifdef DEBUG
// check: tablet ids should be unique
Expand Down
3 changes: 0 additions & 3 deletions be/src/vec/sink/writer/vtablet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -660,9 +660,6 @@ class VTabletWriter final : public AsyncResultWriter {
RuntimeProfile::Counter* _add_batch_number = nullptr;
RuntimeProfile::Counter* _num_node_channels = nullptr;

// load mem limit is for remote load channel
int64_t _load_mem_limit = -1;

// the timeout of load channels opened by this tablet sink. in second
int64_t _load_channel_timeout_s = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,6 @@ private void executeOnce() throws Exception {
curCoordinator.setExecMemoryLimit(execMemLimit);
curCoordinator.setExecPipEngine(Config.enable_pipeline_load);

/*
* For broker load job, user only need to set mem limit by 'exec_mem_limit' property.
* And the variable 'load_mem_limit' does not make any effect.
* However, in order to ensure the consistency of semantics when executing on the BE side,
* and to prevent subsequent modification from incorrectly setting the load_mem_limit,
* here we use exec_mem_limit to directly override the load_mem_limit property.
*/
curCoordinator.setLoadMemLimit(execMemLimit);
curCoordinator.setMemTableOnSinkNode(enableMemTableOnSinkNode);

long leftTimeMs = getLeftTimeMs();
Expand Down

0 comments on commit cacef8d

Please sign in to comment.