Skip to content

Commit

Permalink
[improvement](spill) spill trigger improvement (#32641)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg authored and Doris-Extras committed Apr 10, 2024
1 parent b0b5f84 commit 517c124
Show file tree
Hide file tree
Showing 24 changed files with 382 additions and 74 deletions.
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,9 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "1000");
// Sleep time in milliseconds between memtbale flush mgr refresh iterations
DEFINE_mInt64(memtable_mem_tracker_refresh_interval_ms, "5");

// Sleep time in milliseconds between refresh iterations of workload group memory statistics
DEFINE_mInt64(wg_mem_refresh_interval_ms, "50");

// percent of (active memtables size / all memtables size) when reach hard limit
DEFINE_mInt32(memtable_hard_limit_active_percent, "50");

Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,9 @@ DECLARE_mInt32(memory_gc_sleep_time_ms);
// Sleep time in milliseconds between memtbale flush mgr memory refresh iterations
DECLARE_mInt64(memtable_mem_tracker_refresh_interval_ms);

// Sleep time in milliseconds between refresh iterations of workload group memory statistics
DECLARE_mInt64(wg_mem_refresh_interval_ms);

// percent of (active memtables size / all memtables size) when reach hard limit
DECLARE_mInt32(memtable_hard_limit_active_percent);

Expand Down
13 changes: 13 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,14 @@ void Daemon::je_purge_dirty_pages_thread() const {
} while (true);
}

void Daemon::wg_mem_used_refresh_thread() {
// Refresh memory usage and limit of workload groups
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::wg_mem_refresh_interval_ms))) {
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_memory_info();
}
}

void Daemon::start() {
Status st;
st = Thread::create(
Expand Down Expand Up @@ -402,6 +410,11 @@ void Daemon::start() {
"Daemon", "query_runtime_statistics_thread",
[this]() { this->report_runtime_query_statistics_thread(); }, &_threads.emplace_back());
CHECK(st.ok()) << st;

st = Thread::create(
"Daemon", "wg_mem_refresh_thread", [this]() { this->wg_mem_used_refresh_thread(); },
&_threads.emplace_back());
CHECK(st.ok()) << st;
}

void Daemon::stop() {
Expand Down
1 change: 1 addition & 0 deletions be/src/common/daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Daemon {
void calculate_metrics_thread();
void je_purge_dirty_pages_thread() const;
void report_runtime_query_statistics_thread();
void wg_mem_used_refresh_thread();

CountDownLatch _stop_background_threads_latch;
std::vector<scoped_refptr<Thread>> _threads;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* state, Status exec_stat
if (Base::_closed) {
return Status::OK();
}
dec_running_big_mem_op_num(state);
{
std::unique_lock<std::mutex> lk(_spill_lock);
if (_is_spilling) {
Expand Down Expand Up @@ -159,6 +160,7 @@ Status PartitionedAggSinkOperatorX::close(RuntimeState* state) {
Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
local_state.inc_running_big_mem_op_num(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status);
Expand All @@ -174,6 +176,7 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
RETURN_IF_ERROR(partition->finish_current_spilling(eos));
}
local_state._dependency->set_ready_to_read();
local_state._finish_dependency->set_ready();
}
}
if (local_state._runtime_state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ Status PartitionedAggLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
dec_running_big_mem_op_num(state);
{
std::unique_lock<std::mutex> lk(_merge_spill_lock);
if (_is_merging) {
Expand Down Expand Up @@ -128,6 +129,7 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState* state) {
Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
local_state.inc_running_big_mem_op_num(state);
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state._status);

Expand Down
50 changes: 30 additions & 20 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "partitioned_hash_join_probe_operator.h"

#include "pipeline/pipeline_x/pipeline_x_task.h"
#include "util/mem_info.h"
#include "vec/spill/spill_stream_manager.h"

Expand Down Expand Up @@ -148,6 +149,10 @@ Status PartitionedHashJoinProbeLocalState::open(RuntimeState* state) {
return _partitioner->open(state);
}
Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
dec_running_big_mem_op_num(state);
RETURN_IF_ERROR(JoinProbeLocalState::close(state));
return Status::OK();
}
Expand All @@ -156,7 +161,8 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
uint32_t partition_index) {
auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks;
auto& mutable_block = partitioned_build_blocks[partition_index];
if (!mutable_block || mutable_block->rows() < state->batch_size()) {
if (!mutable_block ||
mutable_block->allocated_bytes() < vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
--_spilling_task_count;
return Status::OK();
}
Expand Down Expand Up @@ -201,6 +207,8 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
--_spilling_task_count;

if (_spilling_task_count == 0) {
LOG(INFO) << "hash probe " << _parent->id()
<< " revoke memory spill_build_block finish";
std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
}
Expand All @@ -225,7 +233,8 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat

auto& blocks = _probe_blocks[partition_index];
auto& partitioned_block = _partitioned_blocks[partition_index];
if (partitioned_block && partitioned_block->rows() >= state->batch_size()) {
if (partitioned_block && partitioned_block->allocated_bytes() >=
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
blocks.emplace_back(partitioned_block->to_block());
partitioned_block.reset();
}
Expand Down Expand Up @@ -263,6 +272,8 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
--_spilling_task_count;

if (_spilling_task_count == 0) {
LOG(INFO) << "hash probe " << _parent->id()
<< " revoke memory spill_probe_blocks finish";
std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
}
Expand Down Expand Up @@ -304,8 +315,6 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
auto& spilled_stream = _shared_state->spilled_streams[partition_index];
has_data = false;
if (!spilled_stream) {
LOG(INFO) << "no data need to recovery for partition: " << partition_index
<< ", node id: " << _parent->id() << ", task id: " << state->task_id();
return Status::OK();
}

Expand Down Expand Up @@ -492,6 +501,7 @@ Status PartitionedHashJoinProbeOperatorX::open(RuntimeState* state) {
Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* input_block,
bool eos) const {
auto& local_state = get_local_state(state);
local_state.inc_running_big_mem_op_num(state);
const auto rows = input_block->rows();
auto& partitioned_blocks = local_state._partitioned_blocks;
if (rows == 0) {
Expand Down Expand Up @@ -694,17 +704,23 @@ size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state
auto& probe_blocks = local_state._probe_blocks;
for (uint32_t i = spilling_start; i < _partition_count; ++i) {
auto& build_block = partitioned_build_blocks[i];
if (build_block && build_block->rows() >= state->batch_size()) {
mem_size += build_block->allocated_bytes();
if (build_block) {
auto block_bytes = build_block->allocated_bytes();
if (block_bytes >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
mem_size += build_block->allocated_bytes();
}
}

for (auto& block : probe_blocks[i]) {
mem_size += block.allocated_bytes();
}

auto& partitioned_block = local_state._partitioned_blocks[i];
if (partitioned_block && partitioned_block->rows() >= state->batch_size()) {
mem_size += partitioned_block->allocated_bytes();
if (partitioned_block) {
auto block_bytes = partitioned_block->allocated_bytes();
if (block_bytes >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
mem_size += block_bytes;
}
}
}
return mem_size;
Expand All @@ -722,6 +738,8 @@ Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bo
return Status::OK();
}

LOG(INFO) << "hash probe " << id()
<< " revoke memory, spill task count: " << local_state._spilling_task_count;
for (uint32_t i = spilling_start; i < _partition_count; ++i) {
RETURN_IF_ERROR(local_state.spill_build_block(state, i));
RETURN_IF_ERROR(local_state.spill_probe_blocks(state, i));
Expand All @@ -739,22 +757,14 @@ Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bo

bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* state) const {
auto& local_state = get_local_state(state);

const auto revocable_size = revocable_mem_size(state);
if (PipelineXTask::should_revoke_memory(state, revocable_size)) {
return true;
}
if (local_state._shared_state->need_to_spill) {
const auto revocable_size = revocable_mem_size(state);
const auto min_revocable_size = state->min_revocable_mem();
return revocable_size > min_revocable_size;
}

auto sys_mem_available = MemInfo::sys_mem_available();
auto sys_mem_warning_water_mark = doris::MemInfo::sys_mem_available_warning_water_mark();

if (sys_mem_available <
sys_mem_warning_water_mark * config::spill_mem_warning_water_mark_multiplier) {
const auto revocable_size = revocable_mem_size(state);
const auto min_revocable_size = state->min_revocable_mem();
return min_revocable_size > 0 && revocable_size > min_revocable_size;
}
return false;
}

Expand Down
28 changes: 22 additions & 6 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,27 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(PipelineXSinkLocalState::open(state));
return _partitioner->open(state);
}
Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(PipelineXSinkLocalState::exec_time_counter());
SCOPED_TIMER(PipelineXSinkLocalState::_close_timer);
if (PipelineXSinkLocalState::_closed) {
return Status::OK();
}
dec_running_big_mem_op_num(state);
return PipelineXSinkLocalState::close(state, exec_status);
}

Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
LOG(INFO) << "hash join sink " << _parent->id() << " revoke_memory"
<< ", eos: " << _child_eos;
DCHECK_EQ(_spilling_streams_count, 0);
_spilling_streams_count = _shared_state->partitioned_build_blocks.size();
for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); ++i) {
vectorized::SpillStreamSPtr& spilling_stream = _shared_state->spilled_streams[i];
auto& mutable_block = _shared_state->partitioned_build_blocks[i];

if (!mutable_block || mutable_block->rows() < state->batch_size()) {
if (!mutable_block ||
mutable_block->allocated_bytes() < vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
--_spilling_streams_count;
continue;
}
Expand Down Expand Up @@ -99,7 +111,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
if (_spilling_streams_count > 0) {
_dependency->block();
} else if (_child_eos) {
LOG(INFO) << "sink eos, set_ready_to_read, node id: " << _parent->id()
LOG(INFO) << "hash join sink " << _parent->id() << " set_ready_to_read"
<< ", task id: " << state->task_id();
_dependency->set_ready_to_read();
}
Expand Down Expand Up @@ -129,7 +141,7 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
if (_child_eos) {
LOG(INFO) << "sink eos, set_ready_to_read, node id: " << _parent->id()
LOG(INFO) << "hash join sink " << _parent->id() << " set_ready_to_read"
<< ", task id: " << state()->task_id();
_dependency->set_ready_to_read();
}
Expand Down Expand Up @@ -176,6 +188,7 @@ Status PartitionedHashJoinSinkOperatorX::open(RuntimeState* state) {
Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
local_state.inc_running_big_mem_op_num(state);
SCOPED_TIMER(local_state.exec_time_counter());
if (!local_state._spill_status_ok) {
DCHECK_NE(local_state._spill_status.code(), 0);
Expand Down Expand Up @@ -227,7 +240,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
}

if (eos) {
LOG(INFO) << "sink eos, set_ready_to_read, node id: " << id()
LOG(INFO) << "hash join sink " << id() << " sink eos, set_ready_to_read"
<< ", task id: " << state->task_id();
local_state._dependency->set_ready_to_read();
}
Expand All @@ -243,8 +256,11 @@ size_t PartitionedHashJoinSinkOperatorX::revocable_mem_size(RuntimeState* state)
size_t mem_size = 0;
for (uint32_t i = 0; i != _partition_count; ++i) {
auto& block = partitioned_blocks[i];
if (block && block->rows() >= state->batch_size()) {
mem_size += block->allocated_bytes();
if (block) {
auto block_bytes = block->allocated_bytes();
if (block_bytes >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
mem_size += block_bytes;
}
}
}
return mem_size;
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class PartitionedHashJoinSinkLocalState
~PartitionedHashJoinSinkLocalState() override = default;
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
Status revoke_memory(RuntimeState* state);

protected:
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/exec/spill_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ Status SpillSortSinkLocalState::close(RuntimeState* state, Status execsink_statu
_spill_cv.wait(lk);
}
}
auto& parent = Base::_parent->template cast<Parent>();
if (parent._enable_spill) {
dec_running_big_mem_op_num(state);
}
return Status::OK();
}
Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) {
Expand Down Expand Up @@ -160,6 +164,9 @@ size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
if (_enable_spill) {
local_state.inc_running_big_mem_op_num(state);
}
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status);
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
Expand All @@ -176,6 +183,7 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc
RETURN_IF_ERROR(revoke_memory(state));
} else {
local_state._dependency->set_ready_to_read();
local_state._finish_dependency->set_ready();
}
} else {
RETURN_IF_ERROR(
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/spill_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ Status SpillSortLocalState::close(RuntimeState* state) {
_merge_spill_cv.wait(lk);
}
}
if (Base::_shared_state->enable_spill) {
dec_running_big_mem_op_num(state);
}
RETURN_IF_ERROR(Base::close(state));
for (auto& stream : _current_merging_streams) {
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
Expand Down Expand Up @@ -249,6 +252,9 @@ Status SpillSortSourceOperatorX::close(RuntimeState* state) {
Status SpillSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
if (local_state.Base::_shared_state->enable_spill) {
local_state.inc_running_big_mem_op_num(state);
}
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state._status);

Expand Down
Loading

0 comments on commit 517c124

Please sign in to comment.