Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](spill) spill trigger improvement #32641

Merged
merged 4 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,9 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "1000");
// Sleep time in milliseconds between memtbale flush mgr refresh iterations
DEFINE_mInt64(memtable_mem_tracker_refresh_interval_ms, "5");

// Sleep time in milliseconds between refresh iterations of workload group memory statistics
DEFINE_mInt64(wg_mem_refresh_interval_ms, "50");

// percent of (active memtables size / all memtables size) when reach hard limit
DEFINE_mInt32(memtable_hard_limit_active_percent, "50");

Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,9 @@ DECLARE_mInt32(memory_gc_sleep_time_ms);
// Sleep time in milliseconds between memtbale flush mgr memory refresh iterations
DECLARE_mInt64(memtable_mem_tracker_refresh_interval_ms);

// Sleep time in milliseconds between refresh iterations of workload group memory statistics
DECLARE_mInt64(wg_mem_refresh_interval_ms);

// percent of (active memtables size / all memtables size) when reach hard limit
DECLARE_mInt32(memtable_hard_limit_active_percent);

Expand Down
13 changes: 13 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,14 @@ void Daemon::je_purge_dirty_pages_thread() const {
} while (true);
}

void Daemon::wg_mem_used_refresh_thread() {
// Refresh memory usage and limit of workload groups
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::wg_mem_refresh_interval_ms))) {
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_memory_info();
}
}

void Daemon::start() {
Status st;
st = Thread::create(
Expand Down Expand Up @@ -413,6 +421,11 @@ void Daemon::start() {
"Daemon", "query_runtime_statistics_thread",
[this]() { this->report_runtime_query_statistics_thread(); }, &_threads.emplace_back());
CHECK(st.ok()) << st;

st = Thread::create(
"Daemon", "wg_mem_refresh_thread", [this]() { this->wg_mem_used_refresh_thread(); },
&_threads.emplace_back());
CHECK(st.ok()) << st;
}

void Daemon::stop() {
Expand Down
1 change: 1 addition & 0 deletions be/src/common/daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Daemon {
void calculate_metrics_thread();
void je_purge_dirty_pages_thread() const;
void report_runtime_query_statistics_thread();
void wg_mem_used_refresh_thread();

CountDownLatch _stop_background_threads_latch;
std::vector<scoped_refptr<Thread>> _threads;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* state, Status exec_stat
if (Base::_closed) {
return Status::OK();
}
dec_running_big_mem_op_num(state);
{
std::unique_lock<std::mutex> lk(_spill_lock);
if (_is_spilling) {
Expand Down Expand Up @@ -159,6 +160,7 @@ Status PartitionedAggSinkOperatorX::close(RuntimeState* state) {
Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
local_state.inc_running_big_mem_op_num(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status);
Expand All @@ -174,6 +176,7 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
RETURN_IF_ERROR(partition->finish_current_spilling(eos));
}
local_state._dependency->set_ready_to_read();
local_state._finish_dependency->set_ready();
}
}
if (local_state._runtime_state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ Status PartitionedAggLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
dec_running_big_mem_op_num(state);
{
std::unique_lock<std::mutex> lk(_merge_spill_lock);
if (_is_merging) {
Expand Down Expand Up @@ -128,6 +129,7 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState* state) {
Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
local_state.inc_running_big_mem_op_num(state);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need check local state's count.
Only operator is ok?

SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state._status);

Expand Down
50 changes: 30 additions & 20 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "partitioned_hash_join_probe_operator.h"

#include "pipeline/pipeline_x/pipeline_x_task.h"
#include "util/mem_info.h"
#include "vec/spill/spill_stream_manager.h"

Expand Down Expand Up @@ -148,6 +149,10 @@ Status PartitionedHashJoinProbeLocalState::open(RuntimeState* state) {
return _partitioner->open(state);
}
Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
dec_running_big_mem_op_num(state);
RETURN_IF_ERROR(JoinProbeLocalState::close(state));
return Status::OK();
}
Expand All @@ -156,7 +161,8 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
uint32_t partition_index) {
auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks;
auto& mutable_block = partitioned_build_blocks[partition_index];
if (!mutable_block || mutable_block->rows() < state->batch_size()) {
if (!mutable_block ||
mutable_block->allocated_bytes() < vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
--_spilling_task_count;
return Status::OK();
}
Expand Down Expand Up @@ -201,6 +207,8 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
--_spilling_task_count;

if (_spilling_task_count == 0) {
LOG(INFO) << "hash probe " << _parent->id()
<< " revoke memory spill_build_block finish";
std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
}
Expand All @@ -225,7 +233,8 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat

auto& blocks = _probe_blocks[partition_index];
auto& partitioned_block = _partitioned_blocks[partition_index];
if (partitioned_block && partitioned_block->rows() >= state->batch_size()) {
if (partitioned_block && partitioned_block->allocated_bytes() >=
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
blocks.emplace_back(partitioned_block->to_block());
partitioned_block.reset();
}
Expand Down Expand Up @@ -263,6 +272,8 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
--_spilling_task_count;

if (_spilling_task_count == 0) {
LOG(INFO) << "hash probe " << _parent->id()
<< " revoke memory spill_probe_blocks finish";
std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
}
Expand Down Expand Up @@ -304,8 +315,6 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
auto& spilled_stream = _shared_state->spilled_streams[partition_index];
has_data = false;
if (!spilled_stream) {
LOG(INFO) << "no data need to recovery for partition: " << partition_index
<< ", node id: " << _parent->id() << ", task id: " << state->task_id();
return Status::OK();
}

Expand Down Expand Up @@ -492,6 +501,7 @@ Status PartitionedHashJoinProbeOperatorX::open(RuntimeState* state) {
Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* input_block,
bool eos) const {
auto& local_state = get_local_state(state);
local_state.inc_running_big_mem_op_num(state);
const auto rows = input_block->rows();
auto& partitioned_blocks = local_state._partitioned_blocks;
if (rows == 0) {
Expand Down Expand Up @@ -694,17 +704,23 @@ size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state
auto& probe_blocks = local_state._probe_blocks;
for (uint32_t i = spilling_start; i < _partition_count; ++i) {
auto& build_block = partitioned_build_blocks[i];
if (build_block && build_block->rows() >= state->batch_size()) {
mem_size += build_block->allocated_bytes();
if (build_block) {
auto block_bytes = build_block->allocated_bytes();
if (block_bytes >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
mem_size += build_block->allocated_bytes();
}
}

for (auto& block : probe_blocks[i]) {
mem_size += block.allocated_bytes();
}

auto& partitioned_block = local_state._partitioned_blocks[i];
if (partitioned_block && partitioned_block->rows() >= state->batch_size()) {
mem_size += partitioned_block->allocated_bytes();
if (partitioned_block) {
auto block_bytes = partitioned_block->allocated_bytes();
if (block_bytes >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
mem_size += block_bytes;
}
}
}
return mem_size;
Expand All @@ -722,6 +738,8 @@ Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bo
return Status::OK();
}

LOG(INFO) << "hash probe " << id()
<< " revoke memory, spill task count: " << local_state._spilling_task_count;
for (uint32_t i = spilling_start; i < _partition_count; ++i) {
RETURN_IF_ERROR(local_state.spill_build_block(state, i));
RETURN_IF_ERROR(local_state.spill_probe_blocks(state, i));
Expand All @@ -739,22 +757,14 @@ Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bo

bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* state) const {
auto& local_state = get_local_state(state);

const auto revocable_size = revocable_mem_size(state);
if (PipelineXTask::should_revoke_memory(state, revocable_size)) {
return true;
}
if (local_state._shared_state->need_to_spill) {
const auto revocable_size = revocable_mem_size(state);
const auto min_revocable_size = state->min_revocable_mem();
return revocable_size > min_revocable_size;
}

auto sys_mem_available = MemInfo::sys_mem_available();
auto sys_mem_warning_water_mark = doris::MemInfo::sys_mem_available_warning_water_mark();

if (sys_mem_available <
sys_mem_warning_water_mark * config::spill_mem_warning_water_mark_multiplier) {
const auto revocable_size = revocable_mem_size(state);
const auto min_revocable_size = state->min_revocable_mem();
return min_revocable_size > 0 && revocable_size > min_revocable_size;
}
return false;
}

Expand Down
28 changes: 22 additions & 6 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,27 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(PipelineXSinkLocalState::open(state));
return _partitioner->open(state);
}
Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec_status) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'close' can be made static [readability-convert-member-functions-to-static]

be/src/pipeline/exec/partitioned_hash_join_sink_operator.h:48:

-     Status close(RuntimeState* state, Status exec_status) override;
+     static Status close(RuntimeState* state, Status exec_status) override;

SCOPED_TIMER(PipelineXSinkLocalState::exec_time_counter());
SCOPED_TIMER(PipelineXSinkLocalState::_close_timer);
if (PipelineXSinkLocalState::_closed) {
return Status::OK();
}
dec_running_big_mem_op_num(state);
return PipelineXSinkLocalState::close(state, exec_status);
}

Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
LOG(INFO) << "hash join sink " << _parent->id() << " revoke_memory"
<< ", eos: " << _child_eos;
DCHECK_EQ(_spilling_streams_count, 0);
_spilling_streams_count = _shared_state->partitioned_build_blocks.size();
for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); ++i) {
vectorized::SpillStreamSPtr& spilling_stream = _shared_state->spilled_streams[i];
auto& mutable_block = _shared_state->partitioned_build_blocks[i];

if (!mutable_block || mutable_block->rows() < state->batch_size()) {
if (!mutable_block ||
mutable_block->allocated_bytes() < vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
--_spilling_streams_count;
continue;
}
Expand Down Expand Up @@ -99,7 +111,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
if (_spilling_streams_count > 0) {
_dependency->block();
} else if (_child_eos) {
LOG(INFO) << "sink eos, set_ready_to_read, node id: " << _parent->id()
LOG(INFO) << "hash join sink " << _parent->id() << " set_ready_to_read"
<< ", task id: " << state->task_id();
_dependency->set_ready_to_read();
}
Expand Down Expand Up @@ -129,7 +141,7 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
if (_child_eos) {
LOG(INFO) << "sink eos, set_ready_to_read, node id: " << _parent->id()
LOG(INFO) << "hash join sink " << _parent->id() << " set_ready_to_read"
<< ", task id: " << state()->task_id();
_dependency->set_ready_to_read();
}
Expand Down Expand Up @@ -176,6 +188,7 @@ Status PartitionedHashJoinSinkOperatorX::open(RuntimeState* state) {
Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
local_state.inc_running_big_mem_op_num(state);
SCOPED_TIMER(local_state.exec_time_counter());
if (!local_state._spill_status_ok) {
DCHECK_NE(local_state._spill_status.code(), 0);
Expand Down Expand Up @@ -227,7 +240,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
}

if (eos) {
LOG(INFO) << "sink eos, set_ready_to_read, node id: " << id()
LOG(INFO) << "hash join sink " << id() << " sink eos, set_ready_to_read"
<< ", task id: " << state->task_id();
local_state._dependency->set_ready_to_read();
}
Expand All @@ -243,8 +256,11 @@ size_t PartitionedHashJoinSinkOperatorX::revocable_mem_size(RuntimeState* state)
size_t mem_size = 0;
for (uint32_t i = 0; i != _partition_count; ++i) {
auto& block = partitioned_blocks[i];
if (block && block->rows() >= state->batch_size()) {
mem_size += block->allocated_bytes();
if (block) {
auto block_bytes = block->allocated_bytes();
if (block_bytes >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
mem_size += block_bytes;
}
}
}
return mem_size;
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class PartitionedHashJoinSinkLocalState
~PartitionedHashJoinSinkLocalState() override = default;
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
Status revoke_memory(RuntimeState* state);

protected:
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/exec/spill_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ Status SpillSortSinkLocalState::close(RuntimeState* state, Status execsink_statu
_spill_cv.wait(lk);
}
}
auto& parent = Base::_parent->template cast<Parent>();
if (parent._enable_spill) {
dec_running_big_mem_op_num(state);
}
return Status::OK();
}
Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) {
Expand Down Expand Up @@ -160,6 +164,9 @@ size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
if (_enable_spill) {
local_state.inc_running_big_mem_op_num(state);
}
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status);
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
Expand All @@ -176,6 +183,7 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc
RETURN_IF_ERROR(revoke_memory(state));
} else {
local_state._dependency->set_ready_to_read();
local_state._finish_dependency->set_ready();
}
} else {
RETURN_IF_ERROR(
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/spill_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ Status SpillSortLocalState::close(RuntimeState* state) {
_merge_spill_cv.wait(lk);
}
}
if (Base::_shared_state->enable_spill) {
dec_running_big_mem_op_num(state);
}
RETURN_IF_ERROR(Base::close(state));
for (auto& stream : _current_merging_streams) {
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
Expand Down Expand Up @@ -249,6 +252,9 @@ Status SpillSortSourceOperatorX::close(RuntimeState* state) {
Status SpillSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
if (local_state.Base::_shared_state->enable_spill) {
local_state.inc_running_big_mem_op_num(state);
}
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state._status);

Expand Down
Loading
Loading