diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index abe2fde555e1647..f820914b33ee940 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -180,42 +180,49 @@ Status PartitionSortSinkOperatorX::_emplace_into_hash_table( const vectorized::ColumnRawPtrs& key_columns, const vectorized::Block* input_block, PartitionSortSinkLocalState& local_state, bool eos) { return std::visit( - [&](auto&& agg_method) -> Status { - SCOPED_TIMER(local_state._build_timer); - using HashMethodType = std::decay_t; - using AggState = typename HashMethodType::State; + vectorized::Overload { + [&](std::monostate& arg) -> Status { + return Status::InternalError("Unit hash table"); + }, + [&](auto& agg_method) -> Status { + SCOPED_TIMER(local_state._build_timer); + using HashMethodType = std::decay_t; + using AggState = typename HashMethodType::State; - AggState state(key_columns); - size_t num_rows = input_block->rows(); - agg_method.init_serialized_keys(key_columns, num_rows); + AggState state(key_columns); + size_t num_rows = input_block->rows(); + agg_method.init_serialized_keys(key_columns, num_rows); - auto creator = [&](const auto& ctor, auto& key, auto& origin) { - HashMethodType::try_presis_key(key, origin, *local_state._agg_arena_pool); - auto* aggregate_data = _pool->add(new vectorized::PartitionBlocks( - local_state._partition_sort_info, local_state._value_places.empty())); - local_state._value_places.push_back(aggregate_data); - ctor(key, aggregate_data); - local_state._num_partition++; - }; - auto creator_for_null_key = [&](auto& mapped) { - mapped = _pool->add(new vectorized::PartitionBlocks( - local_state._partition_sort_info, local_state._value_places.empty())); - local_state._value_places.push_back(mapped); - local_state._num_partition++; - }; + auto creator = [&](const auto& ctor, auto& key, auto& origin) { + HashMethodType::try_presis_key(key, origin, + *local_state._agg_arena_pool); + auto* aggregate_data = _pool->add(new vectorized::PartitionBlocks( + local_state._partition_sort_info, + local_state._value_places.empty())); + local_state._value_places.push_back(aggregate_data); + ctor(key, aggregate_data); + local_state._num_partition++; + }; + auto creator_for_null_key = [&](auto& mapped) { + mapped = _pool->add(new vectorized::PartitionBlocks( + local_state._partition_sort_info, + local_state._value_places.empty())); + local_state._value_places.push_back(mapped); + local_state._num_partition++; + }; - SCOPED_TIMER(local_state._emplace_key_timer); - for (size_t row = 0; row < num_rows; ++row) { - auto& mapped = - agg_method.lazy_emplace(state, row, creator, creator_for_null_key); - mapped->add_row_idx(row); - } - for (auto* place : local_state._value_places) { - SCOPED_TIMER(local_state._selector_block_timer); - RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos)); - } - return Status::OK(); - }, + SCOPED_TIMER(local_state._emplace_key_timer); + for (size_t row = 0; row < num_rows; ++row) { + auto& mapped = agg_method.lazy_emplace(state, row, creator, + creator_for_null_key); + mapped->add_row_idx(row); + } + for (auto* place : local_state._value_places) { + SCOPED_TIMER(local_state._selector_block_timer); + RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos)); + } + return Status::OK(); + }}, local_state._partitioned_data->method_variant); } diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index b610e1b9ed3b366..92cd341de196cb0 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -298,12 +298,17 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { }}; auto* runtime_state = _runtime_state.get(); auto* agg_data = parent._agg_sink_operator->get_agg_data(runtime_state); - Base::_shared_state->sink_status = std::visit( - [&](auto&& agg_method) -> Status { - auto& hash_table = *agg_method.hash_table; - return _spill_hash_table(state, agg_method, hash_table, _eos); - }, - agg_data->method_variant); + Base::_shared_state->sink_status = + std::visit(vectorized::Overload { + [&](std::monostate& arg) -> Status { + return Status::InternalError("Unit hash table"); + }, + [&](auto& agg_method) -> Status { + auto& hash_table = *agg_method.hash_table; + RETURN_IF_CATCH_EXCEPTION(return _spill_hash_table( + state, agg_method, hash_table, _eos)); + }}, + agg_data->method_variant); RETURN_IF_ERROR(Base::_shared_state->sink_status); Base::_shared_state->sink_status = parent._agg_sink_operator->reset_hash_table(runtime_state); diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 43c805b9557ae57..6d871451bfd4df2 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -214,80 +214,83 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime MonotonicStopWatch submit_timer; submit_timer.start(); + auto spill_func = [this, state, query_id, execution_context, submit_timer] { + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + Defer defer {[&]() { + if (!_status.ok() || state->is_cancelled()) { + if (!_status.ok()) { + LOG(WARNING) << "query " << print_id(query_id) << " agg node " + << _parent->node_id() + << " merge spilled agg data error: " << _status; + } + _shared_state->close(); + } else if (_shared_state->spill_partitions.empty()) { + VLOG_DEBUG << "query " << print_id(query_id) << " agg node " << _parent->node_id() + << " merge spilled agg data finish"; + } + Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once(); + _is_merging = false; + _dependency->Dependency::set_ready(); + }}; + bool has_agg_data = false; + auto& parent = Base::_parent->template cast(); + while (!state->is_cancelled() && !has_agg_data && + !_shared_state->spill_partitions.empty()) { + for (auto& stream : _shared_state->spill_partitions[0]->spill_streams_) { + stream->set_read_counters(Base::_spill_read_data_time, + Base::_spill_deserialize_time, Base::_spill_read_bytes, + Base::_spill_read_wait_io_timer); + vectorized::Block block; + bool eos = false; + while (!eos && !state->is_cancelled()) { + { + SCOPED_TIMER(Base::_spill_recover_time); + _status = stream->read_next_block_sync(&block, &eos); + } + RETURN_IF_ERROR(_status); + + if (!block.empty()) { + has_agg_data = true; + _status = parent._agg_source_operator + ->merge_with_serialized_key_helper( + _runtime_state.get(), &block); + RETURN_IF_ERROR(_status); + } + } + (void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream); + } + _shared_state->spill_partitions.pop_front(); + } + if (_shared_state->spill_partitions.empty()) { + _shared_state->close(); + } + return _status; + }; + + auto exception_catch_func = [spill_func, query_id, mem_tracker, shared_state_holder, + execution_context, this]() { + SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + std::shared_ptr execution_context_lock; + auto shared_state_sptr = shared_state_holder.lock(); + if (shared_state_sptr) { + execution_context_lock = execution_context.lock(); + } + if (!shared_state_sptr || !execution_context_lock) { + LOG(INFO) << "query " << print_id(query_id) + << " execution_context released, maybe query was cancelled."; + return; + } + + auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }(); + + if (!status.ok()) { + _status = status; + } + }; RETURN_IF_ERROR( ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( - [this, state, query_id, mem_tracker, shared_state_holder, execution_context, - submit_timer] { - SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); - std::shared_ptr execution_context_lock; - auto shared_state_sptr = shared_state_holder.lock(); - if (shared_state_sptr) { - execution_context_lock = execution_context.lock(); - } - if (!shared_state_sptr || !execution_context_lock) { - LOG(INFO) << "query " << print_id(query_id) - << " execution_context released, maybe query was cancelled."; - // FIXME: return status is meaningless? - return Status::Cancelled("Cancelled"); - } - - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); - Defer defer {[&]() { - if (!_status.ok() || state->is_cancelled()) { - if (!_status.ok()) { - LOG(WARNING) << "query " << print_id(query_id) << " agg node " - << _parent->node_id() - << " merge spilled agg data error: " << _status; - } - _shared_state->close(); - } else if (_shared_state->spill_partitions.empty()) { - VLOG_DEBUG << "query " << print_id(query_id) << " agg node " - << _parent->node_id() - << " merge spilled agg data finish"; - } - Base::_shared_state->in_mem_shared_state->aggregate_data_container - ->init_once(); - _is_merging = false; - _dependency->Dependency::set_ready(); - }}; - bool has_agg_data = false; - auto& parent = Base::_parent->template cast(); - while (!state->is_cancelled() && !has_agg_data && - !_shared_state->spill_partitions.empty()) { - for (auto& stream : - _shared_state->spill_partitions[0]->spill_streams_) { - stream->set_read_counters( - Base::_spill_read_data_time, Base::_spill_deserialize_time, - Base::_spill_read_bytes, Base::_spill_read_wait_io_timer); - vectorized::Block block; - bool eos = false; - while (!eos && !state->is_cancelled()) { - { - SCOPED_TIMER(Base::_spill_recover_time); - _status = stream->read_next_block_sync(&block, &eos); - } - RETURN_IF_ERROR(_status); - - if (!block.empty()) { - has_agg_data = true; - _status = parent._agg_source_operator - ->merge_with_serialized_key_helper( - _runtime_state.get(), &block); - RETURN_IF_ERROR(_status); - } - } - (void)ExecEnv::GetInstance() - ->spill_stream_mgr() - ->delete_spill_stream(stream); - } - _shared_state->spill_partitions.pop_front(); - } - if (_shared_state->spill_partitions.empty()) { - _shared_state->close(); - } - return _status; - })); + exception_catch_func)); return Status::OK(); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 10fa2effcc00fc7..6c5ab3edb3f2ed4 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -153,28 +153,7 @@ Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) { return Status::OK(); } -Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state, - uint32_t partition_index) { - auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks; - auto& mutable_block = partitioned_build_blocks[partition_index]; - if (!mutable_block || - mutable_block->allocated_bytes() < vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { - --_spilling_task_count; - return Status::OK(); - } - - auto& build_spilling_stream = _shared_state->spilled_streams[partition_index]; - if (!build_spilling_stream) { - RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( - state, build_spilling_stream, print_id(state->query_id()), "hash_build_sink", - _parent->id(), std::numeric_limits::max(), - std::numeric_limits::max(), _runtime_profile.get())); - RETURN_IF_ERROR(build_spilling_stream->prepare_spill()); - build_spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count, - _spill_data_size, _spill_write_disk_timer, - _spill_write_wait_io_timer); - } - +Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* state) { auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); auto execution_context = state->get_task_execution_context(); /// Resources in shared state will be released when the operator is closed, @@ -182,14 +161,56 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state /// So, we need hold the pointer of shared state. std::weak_ptr shared_state_holder = _shared_state->shared_from_this(); + auto query_id = state->query_id(); auto mem_tracker = state->get_query_ctx()->query_mem_tracker; MonotonicStopWatch submit_timer; submit_timer.start(); - return spill_io_pool->submit_func([query_id, mem_tracker, shared_state_holder, - execution_context, state, &build_spilling_stream, - &mutable_block, submit_timer, this] { + auto spill_func = [query_id, state, submit_timer, this] { + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + SCOPED_TIMER(_spill_probe_timer); + + auto& p = _parent->cast(); + for (uint32_t partition_index = 0; partition_index != p._partition_count; + ++partition_index) { + auto& blocks = _probe_blocks[partition_index]; + auto& partitioned_block = _partitioned_blocks[partition_index]; + if (partitioned_block && partitioned_block->allocated_bytes() >= + vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { + blocks.emplace_back(partitioned_block->to_block()); + partitioned_block.reset(); + } + + auto& spilling_stream = _probe_spilling_streams[partition_index]; + if (!spilling_stream) { + RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( + state, spilling_stream, print_id(state->query_id()), "hash_probe", + _parent->id(), std::numeric_limits::max(), + std::numeric_limits::max(), _runtime_profile.get())); + RETURN_IF_ERROR(spilling_stream->prepare_spill()); + spilling_stream->set_write_counters( + _spill_serialize_block_timer, _spill_block_count, _spill_data_size, + _spill_write_disk_timer, _spill_write_wait_io_timer); + } + + COUNTER_UPDATE(_spill_probe_blocks, blocks.size()); + while (!blocks.empty() && !state->is_cancelled()) { + auto block = std::move(blocks.back()); + blocks.pop_back(); + RETURN_IF_ERROR(spilling_stream->spill_block(state, block, false)); + COUNTER_UPDATE(_spill_probe_rows, block.rows()); + } + } + VLOG_DEBUG << "query: " << print_id(query_id) + << " hash probe revoke done, node: " << p.node_id() + << ", task: " << state->task_id(); + _dependency->set_ready(); + return Status::OK(); + }; + + auto exception_catch_func = [query_id, mem_tracker, shared_state_holder, execution_context, + spill_func, this]() { SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); std::shared_ptr execution_context_lock; auto shared_state_sptr = shared_state_holder.lock(); @@ -201,116 +222,18 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state << " execution_context released, maybe query was cancelled."; return; } - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); - SCOPED_TIMER(_spill_build_timer); - if (_spill_status_ok) { - auto build_block = mutable_block->to_block(); - DCHECK_EQ(mutable_block->rows(), 0); - auto st = build_spilling_stream->spill_block(state, build_block, false); - if (!st.ok()) { - std::unique_lock lock(_spill_lock); - _spill_status_ok = false; - _spill_status = std::move(st); - } else { - COUNTER_UPDATE(_spill_build_rows, build_block.rows()); - COUNTER_UPDATE(_spill_build_blocks, 1); - } - } - - std::unique_lock lock(_spill_lock); - if (_spilling_task_count.fetch_sub(1) == 1) { - LOG(INFO) << "hash probe " << _parent->id() - << " revoke memory spill_build_block finish"; - _dependency->set_ready(); - } - }); -} -Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* state, - uint32_t partition_index) { - auto& spilling_stream = _probe_spilling_streams[partition_index]; - if (!spilling_stream) { - RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( - state, spilling_stream, print_id(state->query_id()), "hash_probe", _parent->id(), - std::numeric_limits::max(), std::numeric_limits::max(), - _runtime_profile.get())); - RETURN_IF_ERROR(spilling_stream->prepare_spill()); - spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count, - _spill_data_size, _spill_write_disk_timer, - _spill_write_wait_io_timer); - } + auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }(); - auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); - - auto& blocks = _probe_blocks[partition_index]; - auto& partitioned_block = _partitioned_blocks[partition_index]; - if (partitioned_block && partitioned_block->allocated_bytes() >= - vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { - blocks.emplace_back(partitioned_block->to_block()); - partitioned_block.reset(); - } - - if (!blocks.empty()) { - auto execution_context = state->get_task_execution_context(); - /// Resources in shared state will be released when the operator is closed, - /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. - /// So, we need hold the pointer of shared state. - std::weak_ptr shared_state_holder = - _shared_state->shared_from_this(); - - auto query_id = state->query_id(); - auto mem_tracker = state->get_query_ctx()->query_mem_tracker; - - MonotonicStopWatch submit_timer; - submit_timer.start(); - return spill_io_pool->submit_func([query_id, mem_tracker, shared_state_holder, - execution_context, state, &blocks, spilling_stream, - submit_timer, this] { - SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); - std::shared_ptr execution_context_lock; - auto shared_state_sptr = shared_state_holder.lock(); - if (shared_state_sptr) { - execution_context_lock = execution_context.lock(); - } - if (!shared_state_sptr || !execution_context_lock) { - LOG(INFO) << "query: " << print_id(query_id) - << " execution_context released, maybe query was cancelled."; - return; - } - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); - SCOPED_TIMER(_spill_probe_timer); - COUNTER_UPDATE(_spill_probe_blocks, blocks.size()); - while (!blocks.empty() && !state->is_cancelled()) { - auto block = std::move(blocks.back()); - blocks.pop_back(); - if (_spill_status_ok) { - auto st = spilling_stream->spill_block(state, block, false); - if (!st.ok()) { - std::unique_lock lock(_spill_lock); - _spill_status_ok = false; - _spill_status = std::move(st); - break; - } - COUNTER_UPDATE(_spill_probe_rows, block.rows()); - } else { - break; - } - } - - std::unique_lock lock(_spill_lock); - if (_spilling_task_count.fetch_sub(1) == 1) { - LOG(INFO) << "hash probe " << _parent->id() - << " revoke memory spill_probe_blocks finish"; - _dependency->set_ready(); - } - }); - } else { - std::unique_lock lock(_spill_lock); - if (_spilling_task_count.fetch_sub(1) == 1) { - _dependency->set_ready(); + if (!status.ok()) { + _spill_status_ok = false; + _spill_status = std::move(status); } - } - return Status::OK(); + _dependency->set_ready(); + }; + + _dependency->block(); + return spill_io_pool->submit_func(exception_catch_func); } Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_index) { @@ -361,16 +284,10 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti MonotonicStopWatch submit_timer; submit_timer.start(); - auto read_func = [this, query_id, mem_tracker, state, spilled_stream = spilled_stream, - &mutable_block, shared_state_holder, execution_context, submit_timer, - partition_index] { - SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); - std::shared_ptr execution_context_lock; + auto read_func = [this, query_id, state, spilled_stream = spilled_stream, &mutable_block, + shared_state_holder, submit_timer, partition_index] { auto shared_state_sptr = shared_state_holder.lock(); - if (shared_state_sptr) { - execution_context_lock = execution_context.lock(); - } - if (!shared_state_sptr || !execution_context_lock || state->is_cancelled()) { + if (!shared_state_sptr || state->is_cancelled()) { LOG(INFO) << "query: " << print_id(query_id) << " execution_context released, maybe query was cancelled."; return; @@ -378,15 +295,12 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_TIMER(_recovery_build_timer); - Defer defer([this] { --_spilling_task_count; }); - DCHECK_EQ(_spill_status_ok.load(), true); bool eos = false; while (!eos) { vectorized::Block block; auto st = spilled_stream->read_next_block_sync(&block, &eos); if (!st.ok()) { - std::unique_lock lock(_spill_lock); _spill_status_ok = false; _spill_status = std::move(st); break; @@ -409,7 +323,6 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti DCHECK_EQ(mutable_block->columns(), block.columns()); st = mutable_block->merge(std::move(block)); if (!st.ok()) { - std::unique_lock lock(_spill_lock); _spill_status_ok = false; _spill_status = std::move(st); break; @@ -425,16 +338,36 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti _dependency->set_ready(); }; + auto exception_catch_func = [read_func, query_id, mem_tracker, shared_state_holder, + execution_context, state, this]() { + SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + std::shared_ptr execution_context_lock; + auto shared_state_sptr = shared_state_holder.lock(); + if (shared_state_sptr) { + execution_context_lock = execution_context.lock(); + } + if (!shared_state_sptr || !execution_context_lock || state->is_cancelled()) { + LOG(INFO) << "query: " << print_id(query_id) + << " execution_context released, maybe query was cancelled."; + return; + } + + auto status = [&]() { + RETURN_IF_CATCH_EXCEPTION(read_func()); + return Status::OK(); + }(); + + if (!status.ok()) { + _spill_status_ok = false; + _spill_status = std::move(status); + } + }; + auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); has_data = true; _dependency->block(); - ++_spilling_task_count; - auto st = spill_io_pool->submit_func(read_func); - if (!st.ok()) { - --_spilling_task_count; - } - return st; + return spill_io_pool->submit_func(exception_catch_func); } Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(RuntimeState* state, @@ -459,30 +392,14 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti MonotonicStopWatch submit_timer; submit_timer.start(); - auto read_func = [this, query_id, mem_tracker, shared_state_holder, execution_context, - &spilled_stream, &blocks, submit_timer] { - SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); - std::shared_ptr execution_context_lock; - auto shared_state_sptr = shared_state_holder.lock(); - if (shared_state_sptr) { - execution_context_lock = execution_context.lock(); - } - if (!shared_state_sptr || !execution_context_lock) { - LOG(INFO) << "query: " << print_id(query_id) - << " execution_context released, maybe query was cancelled."; - return; - } - + auto read_func = [this, query_id, &spilled_stream, &blocks, submit_timer] { _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_TIMER(_recovery_probe_timer); - Defer defer([this] { --_spilling_task_count; }); - DCHECK_EQ(_spill_status_ok.load(), true); vectorized::Block block; bool eos = false; auto st = spilled_stream->read_next_block_sync(&block, &eos); if (!st.ok()) { - std::unique_lock lock(_spill_lock); _spill_status_ok = false; _spill_status = std::move(st); } else { @@ -501,16 +418,36 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti _dependency->set_ready(); }; + auto exception_catch_func = [read_func, mem_tracker, shared_state_holder, execution_context, + query_id, this]() { + SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + std::shared_ptr execution_context_lock; + auto shared_state_sptr = shared_state_holder.lock(); + if (shared_state_sptr) { + execution_context_lock = execution_context.lock(); + } + if (!shared_state_sptr || !execution_context_lock) { + LOG(INFO) << "query: " << print_id(query_id) + << " execution_context released, maybe query was cancelled."; + return; + } + + auto status = [&]() { + RETURN_IF_CATCH_EXCEPTION(read_func()); + return Status::OK(); + }(); + + if (!status.ok()) { + _spill_status_ok = false; + _spill_status = std::move(status); + } + }; + auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); DCHECK(spill_io_pool != nullptr); _dependency->block(); has_data = true; - ++_spilling_task_count; - auto st = spill_io_pool->submit_func(read_func); - if (!st.ok()) { - --_spilling_task_count; - } - return st; + return spill_io_pool->submit_func(exception_catch_func); } PartitionedHashJoinProbeOperatorX::PartitionedHashJoinProbeOperatorX(ObjectPool* pool, @@ -692,15 +629,6 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, return local_state._spill_status; } - if (_should_revoke_memory(state)) { - bool wait_for_io = false; - RETURN_IF_ERROR((const_cast(this)) - ->_revoke_memory(state, wait_for_io)); - if (wait_for_io) { - return Status::OK(); - } - } - const auto partition_index = local_state._partition_cursor; auto& probe_blocks = local_state._probe_blocks[partition_index]; if (local_state._need_to_setup_internal_operators) { @@ -783,20 +711,8 @@ bool PartitionedHashJoinProbeOperatorX::need_data_from_children(RuntimeState* st size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state) const { auto& local_state = get_local_state(state); size_t mem_size = 0; - uint32_t spilling_start = local_state._child_eos ? local_state._partition_cursor + 1 : 0; - DCHECK_GE(spilling_start, local_state._partition_cursor); - - auto& partitioned_build_blocks = local_state._shared_state->partitioned_build_blocks; auto& probe_blocks = local_state._probe_blocks; - for (uint32_t i = spilling_start; i < _partition_count; ++i) { - auto& build_block = partitioned_build_blocks[i]; - if (build_block) { - auto block_bytes = build_block->allocated_bytes(); - if (block_bytes >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { - mem_size += build_block->allocated_bytes(); - } - } - + for (uint32_t i = 0; i < _partition_count; ++i) { for (auto& block : probe_blocks[i]) { mem_size += block.allocated_bytes(); } @@ -812,33 +728,12 @@ size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state return mem_size; } -Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bool& wait_for_io) { +Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) { auto& local_state = get_local_state(state); - wait_for_io = false; - uint32_t spilling_start = local_state._child_eos ? local_state._partition_cursor + 1 : 0; - DCHECK_GE(spilling_start, local_state._partition_cursor); - - if (_partition_count > spilling_start) { - local_state._spilling_task_count = (_partition_count - spilling_start) * 2; - } else { - return Status::OK(); - } - VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe node: " << id() - << ", task: " << state->task_id() - << ", revoke memory, spill task count: " << local_state._spilling_task_count; - for (uint32_t i = spilling_start; i < _partition_count; ++i) { - RETURN_IF_ERROR(local_state.spill_build_block(state, i)); - RETURN_IF_ERROR(local_state.spill_probe_blocks(state, i)); - } + << ", task: " << state->task_id(); - if (local_state._spilling_task_count > 0) { - std::unique_lock lock(local_state._spill_lock); - if (local_state._spilling_task_count > 0) { - local_state._dependency->block(); - wait_for_io = true; - } - } + RETURN_IF_ERROR(local_state.spill_probe_blocks(state)); return Status::OK(); } @@ -874,11 +769,7 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori const auto need_to_spill = local_state._shared_state->need_to_spill; if (need_more_input_data(state)) { if (need_to_spill && _should_revoke_memory(state)) { - bool wait_for_io = false; - RETURN_IF_ERROR(_revoke_memory(state, wait_for_io)); - if (wait_for_io) { - return Status::OK(); - } + return _revoke_memory(state); } RETURN_IF_ERROR(_child_x->get_block_after_projects(state, local_state._child_block.get(), diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 3b942d15755d1cc..579c4ba841bd5cf 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -48,8 +48,7 @@ class PartitionedHashJoinProbeLocalState final Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; - Status spill_build_block(RuntimeState* state, uint32_t partition_index); - Status spill_probe_blocks(RuntimeState* state, uint32_t partition_index); + Status spill_probe_blocks(RuntimeState* state); Status recovery_build_blocks_from_disk(RuntimeState* state, uint32_t partition_index, bool& has_data); @@ -183,7 +182,7 @@ class PartitionedHashJoinProbeOperatorX final } private: - Status _revoke_memory(RuntimeState* state, bool& wait_for_io); + Status _revoke_memory(RuntimeState* state); friend class PartitionedHashJoinProbeLocalState; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index d253a519b0c7543..45ca975a88cd7da 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -122,30 +122,15 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta /// So, we need hold the pointer of shared state. std::weak_ptr shared_state_holder = _shared_state->shared_from_this(); - - _dependency->block(); auto query_id = state->query_id(); auto mem_tracker = state->get_query_ctx()->query_mem_tracker; - auto spill_func = [shared_state_holder, execution_context, - build_blocks = std::move(build_blocks), state, query_id, mem_tracker, - num_slots, this]() mutable { - SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + auto spill_func = [build_blocks = std::move(build_blocks), state, num_slots, this]() mutable { Defer defer {[&]() { // need to reset build_block here, or else build_block will be destructed // after SCOPED_ATTACH_TASK_WITH_ID and will trigger memory_orphan_check failure build_blocks.clear(); }}; - std::shared_ptr execution_context_lock; - auto shared_state_sptr = shared_state_holder.lock(); - if (shared_state_sptr) { - execution_context_lock = execution_context.lock(); - } - if (!shared_state_sptr || !execution_context_lock || state->is_cancelled()) { - LOG(INFO) << "execution_context released, maybe query was canceled."; - return; - } - auto& p = _parent->cast(); auto& partitioned_blocks = _shared_state->partitioned_build_blocks; std::vector> partitions_indexes(p._partition_count); @@ -228,8 +213,36 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta _dependency->set_ready(); }; + + auto exception_catch_func = [spill_func, shared_state_holder, execution_context, state, + query_id, mem_tracker, this]() mutable { + SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + std::shared_ptr execution_context_lock; + auto shared_state_sptr = shared_state_holder.lock(); + if (shared_state_sptr) { + execution_context_lock = execution_context.lock(); + } + if (!shared_state_sptr || !execution_context_lock || state->is_cancelled()) { + LOG(INFO) << "execution_context released, maybe query was canceled."; + return; + } + + auto status = [&]() { + RETURN_IF_CATCH_EXCEPTION(spill_func()); + return Status::OK(); + }(); + + if (!status.ok()) { + std::unique_lock lock(_spill_lock); + _spill_status = status; + _spill_status_ok = false; + _dependency->set_ready(); + } + }; auto* thread_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); - return thread_pool->submit_func(spill_func); + + _dependency->block(); + return thread_pool->submit_func(exception_catch_func); } Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { @@ -288,7 +301,18 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { } _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_TIMER(_spill_build_timer); - _spill_to_disk(i, spilling_stream); + + auto status = [&]() { + RETURN_IF_CATCH_EXCEPTION(_spill_to_disk(i, spilling_stream)); + return Status::OK(); + }(); + + if (!status.OK()) { + std::unique_lock lock(_spill_lock); + _dependency->set_ready(); + _spill_status_ok = false; + _spill_status = std::move(status); + } }); if (!st.ok()) { diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index c6a943c59b53ca4..004283841ec86b2 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -238,78 +238,83 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { MonotonicStopWatch submit_timer; submit_timer.start(); - status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( - [this, state, query_id, mem_tracker, shared_state_holder, &parent, execution_context, - submit_timer] { - SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); - std::shared_ptr execution_context_lock; - auto shared_state_sptr = shared_state_holder.lock(); - if (shared_state_sptr) { - execution_context_lock = execution_context.lock(); - } - if (!shared_state_sptr || !execution_context_lock) { - LOG(INFO) << "query " << print_id(query_id) - << " execution_context released, maybe query was cancelled."; - return Status::OK(); + auto spill_func = [this, state, query_id, &parent, submit_timer] { + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + Defer defer {[&]() { + if (!_shared_state->sink_status.ok() || state->is_cancelled()) { + if (!_shared_state->sink_status.ok()) { + LOG(WARNING) << "query " << print_id(query_id) << " sort node " << _parent->id() + << " revoke memory error: " << _shared_state->sink_status; } + _shared_state->close(); + } else { + VLOG_DEBUG << "query " << print_id(query_id) << " sort node " << _parent->id() + << " revoke memory finish"; + } - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); - Defer defer {[&]() { - if (!_shared_state->sink_status.ok() || state->is_cancelled()) { - if (!_shared_state->sink_status.ok()) { - LOG(WARNING) << "query " << print_id(query_id) << " sort node " - << _parent->id() - << " revoke memory error: " << _shared_state->sink_status; - } - _shared_state->close(); - } else { - VLOG_DEBUG << "query " << print_id(query_id) << " sort node " - << _parent->id() << " revoke memory finish"; - } - - if (!_shared_state->sink_status.ok()) { - _shared_state->close(); - } - - _spilling_stream.reset(); - if (_eos) { - _dependency->set_ready_to_read(); - _finish_dependency->set_ready(); - } else { - _dependency->Dependency::set_ready(); - } - }}; - - _shared_state->sink_status = - parent._sort_sink_operator->prepare_for_spill(_runtime_state.get()); - RETURN_IF_ERROR(_shared_state->sink_status); - - auto* sink_local_state = _runtime_state->get_sink_local_state(); - update_profile(sink_local_state->profile()); - - bool eos = false; - vectorized::Block block; - while (!eos && !state->is_cancelled()) { - { - SCOPED_TIMER(_spill_merge_sort_timer); - _shared_state->sink_status = - parent._sort_sink_operator->merge_sort_read_for_spill( - _runtime_state.get(), &block, - _shared_state->spill_block_batch_row_count, &eos); - } - RETURN_IF_ERROR(_shared_state->sink_status); - { - SCOPED_TIMER(Base::_spill_timer); - _shared_state->sink_status = - _spilling_stream->spill_block(state, block, eos); - } - RETURN_IF_ERROR(_shared_state->sink_status); - block.clear_column_data(); - } - parent._sort_sink_operator->reset(_runtime_state.get()); + if (!_shared_state->sink_status.ok()) { + _shared_state->close(); + } + + _spilling_stream.reset(); + if (_eos) { + _dependency->set_ready_to_read(); + _finish_dependency->set_ready(); + } else { + _dependency->Dependency::set_ready(); + } + }}; + + _shared_state->sink_status = + parent._sort_sink_operator->prepare_for_spill(_runtime_state.get()); + RETURN_IF_ERROR(_shared_state->sink_status); + + auto* sink_local_state = _runtime_state->get_sink_local_state(); + update_profile(sink_local_state->profile()); + + bool eos = false; + vectorized::Block block; + while (!eos && !state->is_cancelled()) { + { + SCOPED_TIMER(_spill_merge_sort_timer); + _shared_state->sink_status = parent._sort_sink_operator->merge_sort_read_for_spill( + _runtime_state.get(), &block, _shared_state->spill_block_batch_row_count, + &eos); + } + RETURN_IF_ERROR(_shared_state->sink_status); + { + SCOPED_TIMER(Base::_spill_timer); + _shared_state->sink_status = _spilling_stream->spill_block(state, block, eos); + } + RETURN_IF_ERROR(_shared_state->sink_status); + block.clear_column_data(); + } + parent._sort_sink_operator->reset(_runtime_state.get()); - return Status::OK(); - }); + return Status::OK(); + }; + + auto exception_catch_func = [this, query_id, mem_tracker, shared_state_holder, + execution_context, spill_func]() { + SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + std::shared_ptr execution_context_lock; + auto shared_state_sptr = shared_state_holder.lock(); + if (shared_state_sptr) { + execution_context_lock = execution_context.lock(); + } + if (!shared_state_sptr || !execution_context_lock) { + LOG(INFO) << "query " << print_id(query_id) + << " execution_context released, maybe query was cancelled."; + return; + } + + _shared_state->sink_status = [&]() { + RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); + }(); + }; + + status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( + exception_catch_func); if (!status.ok()) { if (!_eos) { Base::_dependency->Dependency::set_ready(); diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index fe6b4ee3efcd426..18a3d4310fda9c9 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -98,20 +98,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat MonotonicStopWatch submit_timer; submit_timer.start(); - auto spill_func = [this, state, query_id, mem_tracker, &parent, shared_state_holder, - execution_context, submit_timer] { - SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); - std::shared_ptr execution_context_lock; - auto shared_state_sptr = shared_state_holder.lock(); - if (shared_state_sptr) { - execution_context_lock = execution_context.lock(); - } - if (!shared_state_sptr || !execution_context_lock) { - LOG(INFO) << "query " << print_id(query_id) - << " execution_context released, maybe query was cancelled."; - return Status::OK(); - } - + auto spill_func = [this, state, query_id, &parent, submit_timer] { _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_TIMER(_spill_merge_sort_timer); Defer defer {[&]() { @@ -185,8 +172,26 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat } return Status::OK(); }; + + auto exception_catch_func = [this, query_id, mem_tracker, shared_state_holder, + execution_context, spill_func]() { + SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + std::shared_ptr execution_context_lock; + auto shared_state_sptr = shared_state_holder.lock(); + if (shared_state_sptr) { + execution_context_lock = execution_context.lock(); + } + if (!shared_state_sptr || !execution_context_lock) { + LOG(INFO) << "query " << print_id(query_id) + << " execution_context released, maybe query was cancelled."; + return; + } + + _status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }(); + }; + return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( - spill_func); + exception_catch_func); } Status SpillSortLocalState::_create_intermediate_merger( diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index 83952411f46b25e..d7589f59f9fc7b1 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -672,68 +672,76 @@ Status StreamingAggLocalState::_pre_agg_with_serialized_key(doris::vectorized::B const bool used_too_much_memory = spill_streaming_agg_mem_limit > 0 && _memory_usage() > spill_streaming_agg_mem_limit; RETURN_IF_ERROR(std::visit( - [&](auto&& agg_method) -> Status { - auto& hash_tbl = *agg_method.hash_table; - /// If too much memory is used during the pre-aggregation stage, - /// it is better to output the data directly without performing further aggregation. - // do not try to do agg, just init and serialize directly return the out_block - if (used_too_much_memory || (hash_tbl.add_elem_size_overflow(rows) && - !_should_expand_preagg_hash_tables())) { - SCOPED_TIMER(_streaming_agg_timer); - ret_flag = true; - - // will serialize value data to string column. - // non-nullable column(id in `_make_nullable_keys`) - // will be converted to nullable. - bool mem_reuse = p._make_nullable_keys.empty() && out_block->mem_reuse(); - - std::vector data_types; - vectorized::MutableColumns value_columns; - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - auto data_type = - _aggregate_evaluators[i]->function()->get_serialized_type(); - if (mem_reuse) { - value_columns.emplace_back( - std::move(*out_block->get_by_position(i + key_size).column) - .mutate()); - } else { - // slot type of value it should always be string type - value_columns.emplace_back(_aggregate_evaluators[i] - ->function() - ->create_serialize_column()); + vectorized::Overload { + [&](std::monostate& arg) -> Status { + return Status::InternalError("Uninited hash table"); + }, + [&](auto& agg_method) -> Status { + auto& hash_tbl = *agg_method.hash_table; + /// If too much memory is used during the pre-aggregation stage, + /// it is better to output the data directly without performing further aggregation. + // do not try to do agg, just init and serialize directly return the out_block + if (used_too_much_memory || (hash_tbl.add_elem_size_overflow(rows) && + !_should_expand_preagg_hash_tables())) { + SCOPED_TIMER(_streaming_agg_timer); + ret_flag = true; + + // will serialize value data to string column. + // non-nullable column(id in `_make_nullable_keys`) + // will be converted to nullable. + bool mem_reuse = + p._make_nullable_keys.empty() && out_block->mem_reuse(); + + std::vector data_types; + vectorized::MutableColumns value_columns; + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + auto data_type = + _aggregate_evaluators[i]->function()->get_serialized_type(); + if (mem_reuse) { + value_columns.emplace_back( + std::move(*out_block->get_by_position(i + key_size) + .column) + .mutate()); + } else { + // slot type of value it should always be string type + value_columns.emplace_back(_aggregate_evaluators[i] + ->function() + ->create_serialize_column()); + } + data_types.emplace_back(data_type); + } + + for (int i = 0; i != _aggregate_evaluators.size(); ++i) { + SCOPED_TIMER(_serialize_data_timer); + RETURN_IF_ERROR( + _aggregate_evaluators[i]->streaming_agg_serialize_to_column( + in_block, value_columns[i], rows, + _agg_arena_pool.get())); + } + + if (!mem_reuse) { + vectorized::ColumnsWithTypeAndName columns_with_schema; + for (int i = 0; i < key_size; ++i) { + columns_with_schema.emplace_back( + key_columns[i]->clone_resized(rows), + _probe_expr_ctxs[i]->root()->data_type(), + _probe_expr_ctxs[i]->root()->expr_name()); + } + for (int i = 0; i < value_columns.size(); ++i) { + columns_with_schema.emplace_back(std::move(value_columns[i]), + data_types[i], ""); + } + out_block->swap(vectorized::Block(columns_with_schema)); + } else { + for (int i = 0; i < key_size; ++i) { + std::move(*out_block->get_by_position(i).column) + .mutate() + ->insert_range_from(*key_columns[i], 0, rows); + } + } } - data_types.emplace_back(data_type); - } - - for (int i = 0; i != _aggregate_evaluators.size(); ++i) { - SCOPED_TIMER(_serialize_data_timer); - RETURN_IF_ERROR(_aggregate_evaluators[i]->streaming_agg_serialize_to_column( - in_block, value_columns[i], rows, _agg_arena_pool.get())); - } - - if (!mem_reuse) { - vectorized::ColumnsWithTypeAndName columns_with_schema; - for (int i = 0; i < key_size; ++i) { - columns_with_schema.emplace_back( - key_columns[i]->clone_resized(rows), - _probe_expr_ctxs[i]->root()->data_type(), - _probe_expr_ctxs[i]->root()->expr_name()); - } - for (int i = 0; i < value_columns.size(); ++i) { - columns_with_schema.emplace_back(std::move(value_columns[i]), - data_types[i], ""); - } - out_block->swap(vectorized::Block(columns_with_schema)); - } else { - for (int i = 0; i < key_size; ++i) { - std::move(*out_block->get_by_position(i).column) - .mutate() - ->insert_range_from(*key_columns[i], 0, rows); - } - } - } - return Status::OK(); - }, + return Status::OK(); + }}, _agg_data->method_variant)); if (!ret_flag) { diff --git a/be/src/vec/spill/spill_stream.cpp b/be/src/vec/spill/spill_stream.cpp index b9c27a9d6ae5836..0ac9f95563ecd0e 100644 --- a/be/src/vec/spill/spill_stream.cpp +++ b/be/src/vec/spill/spill_stream.cpp @@ -42,13 +42,17 @@ SpillStream::SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* d spill_dir_(std::move(spill_dir)), batch_rows_(batch_rows), batch_bytes_(batch_bytes), + query_id_(state->query_id()), profile_(profile) {} SpillStream::~SpillStream() { bool exists = false; auto status = io::global_local_filesystem()->exists(spill_dir_, &exists); if (status.ok() && exists) { - auto gc_dir = fmt::format("{}/{}/{}", get_data_dir()->path(), SPILL_GC_DIR_PREFIX, + auto query_dir = fmt::format("{}/{}/{}", get_data_dir()->path(), SPILL_GC_DIR_PREFIX, + print_id(query_id_)); + (void)io::global_local_filesystem()->create_directory(query_dir); + auto gc_dir = fmt::format("{}/{}", query_dir, std::filesystem::path(spill_dir_).filename().string()); (void)io::global_local_filesystem()->rename(spill_dir_, gc_dir); } @@ -62,7 +66,7 @@ Status SpillStream::prepare() { } const TUniqueId& SpillStream::query_id() const { - return state_->query_id(); + return query_id_; } const std::string& SpillStream::get_spill_root_dir() const { diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h index cadfa6fb6d46205..8751b406608bc96 100644 --- a/be/src/vec/spill/spill_stream.h +++ b/be/src/vec/spill/spill_stream.h @@ -40,6 +40,8 @@ class SpillStream { std::string spill_dir, size_t batch_rows, size_t batch_bytes, RuntimeProfile* profile); + SpillStream() = delete; + ~SpillStream(); int64_t id() const { return stream_id_; } @@ -99,6 +101,8 @@ class SpillStream { SpillWriterUPtr writer_; SpillReaderUPtr reader_; + TUniqueId query_id_; + RuntimeProfile* profile_ = nullptr; RuntimeProfile::Counter* write_wait_io_timer_ = nullptr; RuntimeProfile::Counter* read_wait_io_timer_ = nullptr;