Skip to content

Commit

Permalink
fix and test
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg committed Apr 9, 2024
1 parent d3ad912 commit 54b9f4e
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 15 deletions.
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,7 @@ DEFINE_Int32(partition_disk_index_lru_size, "10000");
DEFINE_String(spill_storage_root_path, "${DORIS_HOME}/storage");
DEFINE_mInt32(spill_storage_usage_percent, "20"); // 20%
DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
DEFINE_mInt32(spill_gc_file_count, "2000");
DEFINE_Int32(spill_io_thread_pool_per_disk_thread_num, "2");
DEFINE_Int32(spill_io_thread_pool_queue_size, "1024");
DEFINE_Int32(spill_async_task_thread_pool_thread_num, "2");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,7 @@ DECLARE_Int32(partition_disk_index_lru_size);
DECLARE_String(spill_storage_root_path);
DECLARE_mInt32(spill_storage_usage_percent);
DECLARE_mInt32(spill_gc_interval_ms);
DECLARE_mInt32(spill_gc_file_count);
DECLARE_Int32(spill_io_thread_pool_per_disk_thread_num);
DECLARE_Int32(spill_io_thread_pool_queue_size);
DECLARE_Int32(spill_async_task_thread_pool_thread_num);
Expand Down
12 changes: 7 additions & 5 deletions be/src/olap/task/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,14 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
}

// write inverted index data
if (_write_inverted_index_data(output_rowset_schema, iter->data_id(),
block.get()) != Status::OK()) {
return Status::Error<ErrorCode::SCHEMA_CHANGE_INFO_INVALID>(
"failed to write block.");
if (!block->empty()) {
if (_write_inverted_index_data(output_rowset_schema, iter->data_id(),
block.get()) != Status::OK()) {
return Status::Error<ErrorCode::SCHEMA_CHANGE_INFO_INVALID>(
"failed to write block.");
}
block->clear_column_data();
}
block->clear_column_data();
}

// finish write inverted index, flush data to compound file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ Status PartitionedHashJoinProbeOperatorX::prepare(RuntimeState* state) {
// to avoid prepare _child_x twice
auto child_x = std::move(_child_x);
RETURN_IF_ERROR(JoinProbeOperatorX::prepare(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc));
RETURN_IF_ERROR(_inner_probe_operator->set_child(child_x));
DCHECK(_build_side_child != nullptr);
_inner_probe_operator->set_build_side_child(_build_side_child);
Expand Down Expand Up @@ -682,7 +683,9 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,

auto block = std::move(probe_blocks.back());
probe_blocks.pop_back();
RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, false));
if (!block.empty()) {
RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, false));
}
}

RETURN_IF_ERROR(_inner_probe_operator->pull(local_state._runtime_state.get(), output_block,
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
DCHECK_EQ(_spilling_streams_count, 0);

if (!_shared_state->need_to_spill) {
_shared_state->need_to_spill = true;
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
_shared_state->inner_shared_state->hash_table_variants.reset();
auto row_desc = p._child_x->row_desc();
Expand Down Expand Up @@ -172,7 +173,6 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
}

if (_spilling_streams_count > 0) {
_shared_state->need_to_spill = true;
std::unique_lock<std::mutex> lock(_spill_lock);
if (_spilling_streams_count > 0) {
_dependency->block();
Expand Down Expand Up @@ -202,7 +202,8 @@ Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
SCOPED_TIMER(_partition_shuffle_timer);
auto* channel_ids = reinterpret_cast<uint64_t*>(_partitioner->get_channel_ids());
std::vector<uint32_t> partition_indexes[p._partition_count];
for (uint32_t i = 0; i != rows; ++i) {
DCHECK_LT(begin, end);
for (size_t i = begin; i != end; ++i) {
partition_indexes[channel_ids[i]].emplace_back(i);
}

Expand Down
10 changes: 6 additions & 4 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
}

auto process_mem_used = doris::MemInfo::proc_mem_no_allocator_cache();
auto sys_mem_available = doris::MemInfo::sys_mem_available();
if (proc_vm_rss < all_queries_mem_used) {
all_queries_mem_used = proc_vm_rss;
}
Expand All @@ -185,14 +184,15 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
// we count these cache memories equally on workload groups.
double ratio = (double)proc_vm_rss / (double)all_queries_mem_used;
if (ratio >= 1.25) {
auto sys_mem_available = doris::MemInfo::sys_mem_available();
std::string debug_msg = fmt::format(
"\nProcess Memory Summary: process_vm_rss: {}, process mem: {}, sys mem available: "
"{}, all quries mem: {}",
PrettyPrinter::print(proc_vm_rss, TUnit::BYTES),
PrettyPrinter::print(process_mem_used, TUnit::BYTES),
PrettyPrinter::print(sys_mem_available, TUnit::BYTES),
PrettyPrinter::print(all_queries_mem_used, TUnit::BYTES));
LOG_EVERY_N(INFO, 10) << debug_msg;
LOG_EVERY_T(INFO, 1) << debug_msg;
}

for (auto& wg : _workload_groups) {
Expand Down Expand Up @@ -229,8 +229,10 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
PrettyPrinter::print(query_weighted_mem_limit, TUnit::BYTES));

debug_msg += "\n Query Memory Summary:";
} else {
continue;
}
// check where queries need to revoke memory for task group
// check whether queries need to revoke memory for task group
for (const auto& query : wg_queries) {
auto query_ctx = query.second.lock();
if (!query_ctx) {
Expand All @@ -253,7 +255,7 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
}
}
if (wg_mem_info.is_high_wartermark || wg_mem_info.is_low_wartermark) {
LOG_EVERY_N(INFO, 10) << debug_msg;
LOG_EVERY_T(INFO, 1) << debug_msg;
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/spill/spill_stream_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ Status SpillStreamManager::init() {
void SpillStreamManager::_spill_gc_thread_callback() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::spill_gc_interval_ms))) {
gc(2000);
gc(config::spill_gc_file_count);
for (auto& [path, dir] : _spill_store_map) {
static_cast<void>(dir->update_capacity());
}
Expand Down Expand Up @@ -322,7 +322,8 @@ bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) {
: (_disk_capacity_bytes - _available_bytes + incoming_data_size) /
(double)_disk_capacity_bytes;
VLOG_DEBUG << fmt::format(
"spill path: {}, capacity: {}, available: {}, used pct: {}, incoming bytes: {}",
"spill data path: {}, capacity: {}, available: {}, used pct: {}, incoming bytes: "
"{}",
_path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
PrettyPrinter::print_bytes(_available_bytes), used_pct,
PrettyPrinter::print_bytes(incoming_data_size));
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/spill/spill_stream_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class SpillDataDir {
friend class SpillStreamManager;
std::string _path;

bool _shared_with_storage_path;
const bool _shared_with_storage_path;

// protect _disk_capacity_bytes, _available_bytes, _limit_bytes, _used_bytes
std::mutex _mutex;
Expand Down

0 comments on commit 54b9f4e

Please sign in to comment.