diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 4660d25428ed1cb..6d165981bef866e 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1162,6 +1162,7 @@ DEFINE_Int32(partition_disk_index_lru_size, "10000"); DEFINE_String(spill_storage_root_path, "${DORIS_HOME}/storage"); DEFINE_mInt32(spill_storage_usage_percent, "20"); // 20% DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s +DEFINE_mInt32(spill_gc_file_count, "2000"); DEFINE_Int32(spill_io_thread_pool_per_disk_thread_num, "2"); DEFINE_Int32(spill_io_thread_pool_queue_size, "1024"); DEFINE_Int32(spill_async_task_thread_pool_thread_num, "2"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 1c8b077f1dd50fc..4eb997e37202ddd 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1244,6 +1244,7 @@ DECLARE_Int32(partition_disk_index_lru_size); DECLARE_String(spill_storage_root_path); DECLARE_mInt32(spill_storage_usage_percent); DECLARE_mInt32(spill_gc_interval_ms); +DECLARE_mInt32(spill_gc_file_count); DECLARE_Int32(spill_io_thread_pool_per_disk_thread_num); DECLARE_Int32(spill_io_thread_pool_queue_size); DECLARE_Int32(spill_async_task_thread_pool_thread_num); diff --git a/be/src/olap/task/index_builder.cpp b/be/src/olap/task/index_builder.cpp index 85ba65820ebc76e..5ef64c1fbc58b97 100644 --- a/be/src/olap/task/index_builder.cpp +++ b/be/src/olap/task/index_builder.cpp @@ -406,12 +406,14 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta } // write inverted index data - if (_write_inverted_index_data(output_rowset_schema, iter->data_id(), - block.get()) != Status::OK()) { - return Status::Error( - "failed to write block."); + if (!block->empty()) { + if (_write_inverted_index_data(output_rowset_schema, iter->data_id(), + block.get()) != Status::OK()) { + return Status::Error( + "failed to write block."); + } + block->clear_column_data(); } - block->clear_column_data(); } // finish write inverted index, flush data to compound file diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 0f837f8bbdad1a4..09e33aae10a7703 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -493,6 +493,7 @@ Status PartitionedHashJoinProbeOperatorX::prepare(RuntimeState* state) { // to avoid prepare _child_x twice auto child_x = std::move(_child_x); RETURN_IF_ERROR(JoinProbeOperatorX::prepare(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc)); RETURN_IF_ERROR(_inner_probe_operator->set_child(child_x)); DCHECK(_build_side_child != nullptr); _inner_probe_operator->set_build_side_child(_build_side_child); @@ -682,7 +683,9 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, auto block = std::move(probe_blocks.back()); probe_blocks.pop_back(); - RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, false)); + if (!block.empty()) { + RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, false)); + } } RETURN_IF_ERROR(_inner_probe_operator->pull(local_state._runtime_state.get(), output_block, diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 370606b190454cc..1f271c15e09a61e 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -93,6 +93,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { DCHECK_EQ(_spilling_streams_count, 0); if (!_shared_state->need_to_spill) { + _shared_state->need_to_spill = true; auto& p = _parent->cast(); _shared_state->inner_shared_state->hash_table_variants.reset(); auto row_desc = p._child_x->row_desc(); @@ -172,7 +173,6 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { } if (_spilling_streams_count > 0) { - _shared_state->need_to_spill = true; std::unique_lock lock(_spill_lock); if (_spilling_streams_count > 0) { _dependency->block(); @@ -202,7 +202,8 @@ Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state, SCOPED_TIMER(_partition_shuffle_timer); auto* channel_ids = reinterpret_cast(_partitioner->get_channel_ids()); std::vector partition_indexes[p._partition_count]; - for (uint32_t i = 0; i != rows; ++i) { + DCHECK_LT(begin, end); + for (size_t i = begin; i != end; ++i) { partition_indexes[channel_ids[i]].emplace_back(i); } diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 78e2cd1fe060576..55d3ff497da946f 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -174,7 +174,6 @@ void WorkloadGroupMgr::refresh_wg_memory_info() { } auto process_mem_used = doris::MemInfo::proc_mem_no_allocator_cache(); - auto sys_mem_available = doris::MemInfo::sys_mem_available(); if (proc_vm_rss < all_queries_mem_used) { all_queries_mem_used = proc_vm_rss; } @@ -185,6 +184,7 @@ void WorkloadGroupMgr::refresh_wg_memory_info() { // we count these cache memories equally on workload groups. double ratio = (double)proc_vm_rss / (double)all_queries_mem_used; if (ratio >= 1.25) { + auto sys_mem_available = doris::MemInfo::sys_mem_available(); std::string debug_msg = fmt::format( "\nProcess Memory Summary: process_vm_rss: {}, process mem: {}, sys mem available: " "{}, all quries mem: {}", @@ -192,7 +192,7 @@ void WorkloadGroupMgr::refresh_wg_memory_info() { PrettyPrinter::print(process_mem_used, TUnit::BYTES), PrettyPrinter::print(sys_mem_available, TUnit::BYTES), PrettyPrinter::print(all_queries_mem_used, TUnit::BYTES)); - LOG_EVERY_N(INFO, 10) << debug_msg; + LOG_EVERY_T(INFO, 1) << debug_msg; } for (auto& wg : _workload_groups) { @@ -229,8 +229,10 @@ void WorkloadGroupMgr::refresh_wg_memory_info() { PrettyPrinter::print(query_weighted_mem_limit, TUnit::BYTES)); debug_msg += "\n Query Memory Summary:"; + } else { + continue; } - // check where queries need to revoke memory for task group + // check whether queries need to revoke memory for task group for (const auto& query : wg_queries) { auto query_ctx = query.second.lock(); if (!query_ctx) { @@ -253,7 +255,7 @@ void WorkloadGroupMgr::refresh_wg_memory_info() { } } if (wg_mem_info.is_high_wartermark || wg_mem_info.is_low_wartermark) { - LOG_EVERY_N(INFO, 10) << debug_msg; + LOG_EVERY_T(INFO, 1) << debug_msg; } } } diff --git a/be/src/vec/spill/spill_stream_manager.cpp b/be/src/vec/spill/spill_stream_manager.cpp index f298d5ffec4f138..33f42c1711df450 100644 --- a/be/src/vec/spill/spill_stream_manager.cpp +++ b/be/src/vec/spill/spill_stream_manager.cpp @@ -109,7 +109,7 @@ Status SpillStreamManager::init() { void SpillStreamManager::_spill_gc_thread_callback() { while (!_stop_background_threads_latch.wait_for( std::chrono::milliseconds(config::spill_gc_interval_ms))) { - gc(2000); + gc(config::spill_gc_file_count); for (auto& [path, dir] : _spill_store_map) { static_cast(dir->update_capacity()); } @@ -322,7 +322,8 @@ bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) { : (_disk_capacity_bytes - _available_bytes + incoming_data_size) / (double)_disk_capacity_bytes; VLOG_DEBUG << fmt::format( - "spill path: {}, capacity: {}, available: {}, used pct: {}, incoming bytes: {}", + "spill data path: {}, capacity: {}, available: {}, used pct: {}, incoming bytes: " + "{}", _path, PrettyPrinter::print_bytes(_disk_capacity_bytes), PrettyPrinter::print_bytes(_available_bytes), used_pct, PrettyPrinter::print_bytes(incoming_data_size)); diff --git a/be/src/vec/spill/spill_stream_manager.h b/be/src/vec/spill/spill_stream_manager.h index 104bd07ef8b54f5..06060327546f50b 100644 --- a/be/src/vec/spill/spill_stream_manager.h +++ b/be/src/vec/spill/spill_stream_manager.h @@ -93,7 +93,7 @@ class SpillDataDir { friend class SpillStreamManager; std::string _path; - bool _shared_with_storage_path; + const bool _shared_with_storage_path; // protect _disk_capacity_bytes, _available_bytes, _limit_bytes, _used_bytes std::mutex _mutex;