Skip to content

Commit

Permalink
[improvement](spill) spill trigger improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg committed Mar 25, 2024
1 parent 645a4c4 commit d29b975
Show file tree
Hide file tree
Showing 22 changed files with 373 additions and 58 deletions.
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,9 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "1000");
// Sleep time in milliseconds between memtbale flush mgr refresh iterations
DEFINE_mInt64(memtable_mem_tracker_refresh_interval_ms, "5");

// Sleep time in milliseconds between task group used memory refresh iterations
DEFINE_mInt64(wg_mem_refresh_interval_ms, "50");

// percent of (active memtables size / all memtables size) when reach hard limit
DEFINE_mInt32(memtable_hard_limit_active_percent, "50");

Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,9 @@ DECLARE_mInt32(memory_gc_sleep_time_ms);
// Sleep time in milliseconds between memtbale flush mgr memory refresh iterations
DECLARE_mInt64(memtable_mem_tracker_refresh_interval_ms);

// Sleep time in milliseconds between workload group used memory refresh iterations
DECLARE_mInt64(wg_mem_refresh_interval_ms);

// percent of (active memtables size / all memtables size) when reach hard limit
DECLARE_mInt32(memtable_hard_limit_active_percent);

Expand Down
13 changes: 13 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,14 @@ void Daemon::je_purge_dirty_pages_thread() const {
} while (true);
}

void Daemon::wg_mem_used_refresh_thread() {
// Refresh memory usage and limit of workload groups
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::wg_mem_refresh_interval_ms))) {
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_memory_info();
}
}

void Daemon::start() {
Status st;
st = Thread::create(
Expand Down Expand Up @@ -413,6 +421,11 @@ void Daemon::start() {
"Daemon", "query_runtime_statistics_thread",
[this]() { this->report_runtime_query_statistics_thread(); }, &_threads.emplace_back());
CHECK(st.ok()) << st;

st = Thread::create(
"Daemon", "wg_mem_refresh_thread", [this]() { this->wg_mem_used_refresh_thread(); },
&_threads.emplace_back());
CHECK(st.ok()) << st;
}

void Daemon::stop() {
Expand Down
1 change: 1 addition & 0 deletions be/src/common/daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Daemon {
void calculate_metrics_thread();
void je_purge_dirty_pages_thread() const;
void report_runtime_query_statistics_thread();
void wg_mem_used_refresh_thread();

CountDownLatch _stop_background_threads_latch;
std::vector<scoped_refptr<Thread>> _threads;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* state, Status exec_stat
_spill_cv.wait(lk);
}
}
dec_running_big_mem_op_num(state);
return Base::close(state, exec_status);
}

Expand Down Expand Up @@ -159,6 +160,7 @@ Status PartitionedAggSinkOperatorX::close(RuntimeState* state) {
Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
local_state.inc_running_big_mem_op_num(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status);
Expand All @@ -171,6 +173,7 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
RETURN_IF_ERROR(revoke_memory(state));
} else {
local_state._dependency->set_ready_to_read();
local_state._finish_dependency->set_ready();
}
}
if (local_state._runtime_state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ Status PartitionedAggLocalState::close(RuntimeState* state) {
_merge_spill_cv.wait(lk);
}
}
dec_running_big_mem_op_num(state);
return Base::close(state);
}
PartitionedAggSourceOperatorX::PartitionedAggSourceOperatorX(ObjectPool* pool,
Expand Down Expand Up @@ -128,6 +129,7 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState* state) {
Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
local_state.inc_running_big_mem_op_num(state);
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state._status);

Expand Down
38 changes: 21 additions & 17 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "partitioned_hash_join_probe_operator.h"

#include "pipeline/pipeline_x/pipeline_x_task.h"
#include "util/mem_info.h"
#include "vec/spill/spill_stream_manager.h"

Expand Down Expand Up @@ -148,6 +149,10 @@ Status PartitionedHashJoinProbeLocalState::open(RuntimeState* state) {
return _partitioner->open(state);
}
Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
dec_running_big_mem_op_num(state);
RETURN_IF_ERROR(JoinProbeLocalState::close(state));
return Status::OK();
}
Expand Down Expand Up @@ -196,6 +201,7 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
}
LOG(INFO) << "hash probe " << _parent->id() << " revoke memory spill_build_block finish";
});
}

Expand Down Expand Up @@ -250,13 +256,17 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
}
LOG(INFO) << "hash probe " << _parent->id()
<< " revoke memory spill_probe_blocks finish";
});
} else {
--_spilling_task_count;
if (_spilling_task_count == 0) {
std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
}
LOG(INFO) << "hash probe " << _parent->id()
<< " revoke memory spill_probe_blocks no data to spill";
}
return Status::OK();
}
Expand Down Expand Up @@ -462,6 +472,7 @@ Status PartitionedHashJoinProbeOperatorX::open(RuntimeState* state) {
Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* input_block,
bool eos) const {
auto& local_state = get_local_state(state);
local_state.inc_running_big_mem_op_num(state);
const auto rows = input_block->rows();
auto& partitioned_blocks = local_state._partitioned_blocks;
if (rows == 0) {
Expand Down Expand Up @@ -666,17 +677,17 @@ size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state
auto& probe_blocks = local_state._probe_blocks;
for (uint32_t i = spilling_start; i < _partition_count; ++i) {
auto& build_block = partitioned_build_blocks[i];
if (build_block && build_block->rows() >= state->batch_size()) {
mem_size += build_block->allocated_bytes();
if (build_block && build_block->rows() > 0) {
mem_size += build_block->bytes();
}

for (auto& block : probe_blocks[i]) {
mem_size += block.allocated_bytes();
mem_size += block.bytes();
}

auto& partitioned_block = local_state._partitioned_blocks[i];
if (partitioned_block && partitioned_block->rows() >= state->batch_size()) {
mem_size += partitioned_block->allocated_bytes();
if (partitioned_block && partitioned_block->rows() > 0) {
mem_size += partitioned_block->bytes();
}
}
return mem_size;
Expand All @@ -694,6 +705,7 @@ Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bo
return Status::OK();
}

LOG(INFO) << "hash probe " << id() << " revoke memory";
for (uint32_t i = spilling_start; i < _partition_count; ++i) {
RETURN_IF_ERROR(local_state.spill_build_block(state, i));
RETURN_IF_ERROR(local_state.spill_probe_blocks(state, i));
Expand All @@ -711,22 +723,14 @@ Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bo

bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* state) const {
auto& local_state = get_local_state(state);

const auto revocable_size = revocable_mem_size(state);
if (PipelineXTask::should_revoke_memory(state, revocable_size)) {
return true;
}
if (local_state._shared_state->need_to_spill) {
const auto revocable_size = revocable_mem_size(state);
const auto min_revocable_size = state->min_revocable_mem();
return revocable_size > min_revocable_size;
}

auto sys_mem_available = MemInfo::sys_mem_available();
auto sys_mem_warning_water_mark = doris::MemInfo::sys_mem_available_warning_water_mark();

if (sys_mem_available <
sys_mem_warning_water_mark * config::spill_mem_warning_water_mark_multiplier) {
const auto revocable_size = revocable_mem_size(state);
const auto min_revocable_size = state->min_revocable_mem();
return min_revocable_size > 0 && revocable_size > min_revocable_size;
}
return false;
}

Expand Down
18 changes: 15 additions & 3 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,25 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(PipelineXSinkLocalState::open(state));
return _partitioner->open(state);
}
Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(PipelineXSinkLocalState::exec_time_counter());
SCOPED_TIMER(PipelineXSinkLocalState::_close_timer);
if (PipelineXSinkLocalState::_closed) {
return Status::OK();
}
dec_running_big_mem_op_num(state);
return PipelineXSinkLocalState::close(state, exec_status);
}

Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
LOG(INFO) << "revoke_memory, node id: " << _parent->id() << ", eos: " << _child_eos;
DCHECK_EQ(_spilling_streams_count, 0);
_spilling_streams_count = _shared_state->partitioned_build_blocks.size();
for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); ++i) {
vectorized::SpillStreamSPtr& spilling_stream = _shared_state->spilled_streams[i];
auto& mutable_block = _shared_state->partitioned_build_blocks[i];

if (!mutable_block || mutable_block->rows() < state->batch_size()) {
if (!mutable_block || mutable_block->rows() == 0) {
--_spilling_streams_count;
continue;
}
Expand Down Expand Up @@ -127,6 +137,7 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
_dependency->set_ready_to_read();
}
}
LOG(INFO) << "revoke_memory finish, node id: " << _parent->id() << ", eos: " << _child_eos;
}

PartitionedHashJoinSinkOperatorX::PartitionedHashJoinSinkOperatorX(
Expand Down Expand Up @@ -169,6 +180,7 @@ Status PartitionedHashJoinSinkOperatorX::open(RuntimeState* state) {
Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
local_state.inc_running_big_mem_op_num(state);
SCOPED_TIMER(local_state.exec_time_counter());
if (!local_state._spill_status_ok) {
DCHECK_NE(local_state._spill_status.code(), 0);
Expand Down Expand Up @@ -236,8 +248,8 @@ size_t PartitionedHashJoinSinkOperatorX::revocable_mem_size(RuntimeState* state)
size_t mem_size = 0;
for (uint32_t i = 0; i != _partition_count; ++i) {
auto& block = partitioned_blocks[i];
if (block && block->rows() >= state->batch_size()) {
mem_size += block->allocated_bytes();
if (block && block->rows() > 0) {
mem_size += block->bytes();
}
}
return mem_size;
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class PartitionedHashJoinSinkLocalState
~PartitionedHashJoinSinkLocalState() override = default;
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
Status revoke_memory(RuntimeState* state);

protected:
Expand All @@ -62,6 +63,7 @@ class PartitionedHashJoinSinkLocalState
std::mutex _spill_lock;

bool _child_eos {false};
bool _is_running {false};

Status _spill_status;
std::mutex _spill_status_lock;
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/exec/spill_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ Status SpillSortSinkLocalState::close(RuntimeState* state, Status execsink_statu
_spill_cv.wait(lk);
}
}
auto& parent = Base::_parent->template cast<Parent>();
if (parent._enable_spill) {
dec_running_big_mem_op_num(state);
}
return Status::OK();
}
Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) {
Expand Down Expand Up @@ -161,6 +165,9 @@ size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
if (_enable_spill) {
local_state.inc_running_big_mem_op_num(state);
}
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status);
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
Expand All @@ -177,6 +184,7 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc
RETURN_IF_ERROR(revoke_memory(state));
} else {
local_state._dependency->set_ready_to_read();
local_state._finish_dependency->set_ready();
}
} else {
RETURN_IF_ERROR(
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/spill_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ Status SpillSortLocalState::close(RuntimeState* state) {
_merge_spill_cv.wait(lk);
}
}
if (Base::_shared_state->enable_spill) {
dec_running_big_mem_op_num(state);
}
RETURN_IF_ERROR(Base::close(state));
for (auto& stream : _current_merging_streams) {
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
Expand Down Expand Up @@ -242,6 +245,9 @@ Status SpillSortSourceOperatorX::close(RuntimeState* state) {
Status SpillSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
if (local_state.Base::_shared_state->enable_spill) {
local_state.inc_running_big_mem_op_num(state);
}
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state._status);

Expand Down
40 changes: 40 additions & 0 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,29 @@ class PipelineXLocalState : public PipelineXLocalStateBase {
return _dependency ? std::vector<Dependency*> {_dependency} : std::vector<Dependency*> {};
}

void inc_running_big_mem_op_num(RuntimeState* state) {
if (!_big_mem_op_num_added) {
state->get_query_ctx()->inc_running_big_mem_op_num();
_big_mem_op_num_added = true;
}
}

void dec_running_big_mem_op_num(RuntimeState* state) {
if (!_big_mem_op_num_added) {
if (!_big_mem_op_num_deced) {
state->get_query_ctx()->dec_running_big_mem_op_num();
_big_mem_op_num_deced = true;
}
}
}

protected:
Dependency* _dependency = nullptr;
SharedStateArg* _shared_state = nullptr;

private:
bool _big_mem_op_num_added = false;
bool _big_mem_op_num_deced = false;
};

template <typename SharedStateArg>
Expand Down Expand Up @@ -666,9 +686,29 @@ class PipelineXSinkLocalState : public PipelineXSinkLocalStateBase {
return _dependency ? std::vector<Dependency*> {_dependency} : std::vector<Dependency*> {};
}

void inc_running_big_mem_op_num(RuntimeState* state) {
if (!_big_mem_op_num_added) {
state->get_query_ctx()->inc_running_big_mem_op_num();
_big_mem_op_num_added = true;
}
}

void dec_running_big_mem_op_num(RuntimeState* state) {
if (!_big_mem_op_num_added) {
if (!_big_mem_op_num_deced) {
state->get_query_ctx()->dec_running_big_mem_op_num();
_big_mem_op_num_deced = true;
}
}
}

protected:
Dependency* _dependency = nullptr;
SharedStateType* _shared_state = nullptr;

private:
bool _big_mem_op_num_added = false;
bool _big_mem_op_num_deced = false;
};

template <typename SharedStateArg>
Expand Down
Loading

0 comments on commit d29b975

Please sign in to comment.