diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 123097cfd53d165..eeed37907f9ebeb 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -239,10 +239,6 @@ class ExecNode { size_t children_count() const { return _children.size(); } - // when the fragment is normal finished, call this method to do some finish work - // such as send the last buffer to remote. - virtual Status try_close(RuntimeState* state) { return Status::OK(); } - protected: friend class DataSink; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 1e0f68131e87bbc..05d9c7292f758c6 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -64,6 +64,14 @@ bool ScanOperator::can_read() { } } +bool ScanOperator::is_pending_finish() const { + return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule(); +} + +Status ScanOperator::try_close(RuntimeState* state) { + return _node->try_close(state); +} + bool ScanOperator::runtime_filters_are_ready_or_timeout() { return _node->runtime_filters_are_ready_or_timeout(); } @@ -73,8 +81,9 @@ std::string ScanOperator::debug_string() const { fmt::format_to(debug_string_buffer, "{}, scanner_ctx is null: {} ", SourceOperator::debug_string(), _node->_scanner_ctx == nullptr); if (_node->_scanner_ctx) { - fmt::format_to(debug_string_buffer, ", num_running_scanners = {}", - _node->_scanner_ctx->get_num_running_scanners()); + fmt::format_to(debug_string_buffer, ", num_running_scanners = {}, num_scheduling_ctx = {} ", + _node->_scanner_ctx->get_num_running_scanners(), + _node->_scanner_ctx->get_num_scheduling_ctx()); } return fmt::to_string(debug_string_buffer); } @@ -92,6 +101,9 @@ std::string ScanOperator::debug_string() const { template ScanLocalState::ScanLocalState(RuntimeState* state, OperatorXBase* parent) : ScanLocalStateBase(state, parent) { + _finish_dependency = std::make_shared( + parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY", + state->get_query_ctx()); _filter_dependency = std::make_shared( parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", state->get_query_ctx()); @@ -165,6 +177,7 @@ Status ScanLocalState::open(RuntimeState* state) { auto status = _eos ? Status::OK() : _prepare_scanners(); if (_scanner_ctx) { + _finish_dependency->block(); DCHECK(!_eos && _num_scanners->value() > 0); RETURN_IF_ERROR(_scanner_ctx->init()); RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx)); @@ -557,14 +570,15 @@ std::string ScanLocalState::debug_string(int indentation_level) const { PipelineXLocalState<>::debug_string(indentation_level), _eos.load()); if (_scanner_ctx) { fmt::format_to(debug_string_buffer, ""); - fmt::format_to(debug_string_buffer, - ", Scanner Context: (_is_finished = {}, _should_stop = {}, " - "_num_running_scanners={}, " - " _num_unfinished_scanners = {}, status = {}, error = {})", - _scanner_ctx->is_finished(), _scanner_ctx->should_stop(), - _scanner_ctx->get_num_running_scanners(), - _scanner_ctx->get_num_unfinished_scanners(), - _scanner_ctx->status().to_string(), _scanner_ctx->status_error()); + fmt::format_to( + debug_string_buffer, + ", Scanner Context: (_is_finished = {}, _should_stop = {}, " + "_num_running_scanners={}, " + "_num_scheduling_ctx = {}, _num_unfinished_scanners = {}, status = {}, error = {})", + _scanner_ctx->is_finished(), _scanner_ctx->should_stop(), + _scanner_ctx->get_num_running_scanners(), _scanner_ctx->get_num_scheduling_ctx(), + _scanner_ctx->get_num_unfinished_scanners(), _scanner_ctx->status().to_string(), + _scanner_ctx->status_error()); } return fmt::to_string(debug_string_buffer); @@ -1212,27 +1226,24 @@ template Status ScanLocalState::_prepare_scanners() { std::list scanners; RETURN_IF_ERROR(_init_scanners(&scanners)); - // Init scanner wrapper - for (auto it = scanners.begin(); it != scanners.end(); ++it) { - _scanners.emplace_back(std::make_shared(*it)); - } if (scanners.empty()) { _eos = true; _scan_dependency->set_ready(); } else { COUNTER_SET(_num_scanners, static_cast(scanners.size())); - RETURN_IF_ERROR(_start_scanners(_scanners)); + RETURN_IF_ERROR(_start_scanners(scanners)); } return Status::OK(); } template Status ScanLocalState::_start_scanners( - const std::list>& scanners) { + const std::list& scanners) { auto& p = _parent->cast(); _scanner_ctx = PipScannerContext::create_shared(state(), this, p._output_tuple_desc, scanners, p.limit(), state()->scan_queue_mem_limit(), - p._col_distribute_ids, 1, _scan_dependency); + p._col_distribute_ids, 1, _scan_dependency, + _finish_dependency); return Status::OK(); } @@ -1308,6 +1319,9 @@ Status ScanLocalState::_init_profile() { _max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT); + _wait_for_finish_dependency_timer = + ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency"); + return Status::OK(); } @@ -1415,6 +1429,17 @@ Status ScanOperatorX::open(RuntimeState* state) { return Status::OK(); } +template +Status ScanOperatorX::try_close(RuntimeState* state) { + auto& local_state = get_local_state(state); + if (local_state._scanner_ctx) { + // mark this scanner ctx as should_stop to make sure scanners will not be scheduled anymore + // TODO: there is a lock in `set_should_stop` may cause some slight impact + local_state._scanner_ctx->set_should_stop(); + } + return Status::OK(); +} + template Status ScanLocalState::close(RuntimeState* state) { if (_closed) { @@ -1426,9 +1451,10 @@ Status ScanLocalState::close(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); if (_scanner_ctx) { - _scanner_ctx->stop_scanners(state); + _scanner_ctx->clear_and_join(reinterpret_cast(this), state); } COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time()); + COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time()); return PipelineXLocalState<>::close(state); @@ -1485,7 +1511,7 @@ Status ScanOperatorX::get_block(RuntimeState* state, vectorized: if (eos) { source_state = SourceState::FINISHED; // reach limit, stop the scanners. - local_state._scanner_ctx->stop_scanners(state); + local_state._scanner_ctx->set_should_stop(); } return Status::OK(); diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index bf083d82d5d89cd..3690e9eb39ca8ed 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -31,9 +31,6 @@ namespace doris { class ExecNode; } // namespace doris -namespace doris::vectorized { -class ScannerDelegate; -} namespace doris::pipeline { class PipScannerContext; @@ -51,9 +48,13 @@ class ScanOperator : public SourceOperator { bool can_read() override; // for source + bool is_pending_finish() const override; + bool runtime_filters_are_ready_or_timeout() override; std::string debug_string() const override; + + Status try_close(RuntimeState* state) override; }; class ScanDependency final : public Dependency { @@ -170,6 +171,7 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public vectorized::Runt RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr; // time of prefilter input block from scanner RuntimeProfile::Counter* _wait_for_eos_timer = nullptr; + RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr; RuntimeProfile::Counter* _wait_for_rf_timer = nullptr; }; @@ -212,6 +214,7 @@ class ScanLocalState : public ScanLocalStateBase { Dependency* dependency() override { return _scan_dependency.get(); } RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }; + Dependency* finishdependency() override { return _finish_dependency.get(); } protected: template @@ -347,7 +350,7 @@ class ScanLocalState : public ScanLocalStateBase { Status _prepare_scanners(); // Submit the scanner to the thread pool and start execution - Status _start_scanners(const std::list>& scanners); + Status _start_scanners(const std::list& scanners); // For some conjunct there is chance to elimate cast operator // Eg. Variant's sub column could eliminate cast in storage layer if @@ -410,13 +413,14 @@ class ScanLocalState : public ScanLocalStateBase { std::shared_ptr _filter_dependency; - // ScanLocalState owns the ownership of scanner, scanner context only has its weakptr - std::list> _scanners; + std::shared_ptr _finish_dependency; }; template class ScanOperatorX : public OperatorX { public: + Status try_close(RuntimeState* state) override; + Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override { return OperatorXBase::prepare(state); } Status open(RuntimeState* state) override; diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 56ceb20bf1502e3..309aed96a8cf7f1 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -31,9 +31,9 @@ class PipScannerContext : public vectorized::ScannerContext { public: PipScannerContext(RuntimeState* state, vectorized::VScanNode* parent, const TupleDescriptor* output_tuple_desc, - const std::list>& scanners, - int64_t limit_, int64_t max_bytes_in_blocks_queue, - const std::vector& col_distribute_ids, const int num_parallel_instances) + const std::list& scanners, int64_t limit_, + int64_t max_bytes_in_blocks_queue, const std::vector& col_distribute_ids, + const int num_parallel_instances) : vectorized::ScannerContext(state, parent, output_tuple_desc, scanners, limit_, max_bytes_in_blocks_queue, num_parallel_instances), _col_distribute_ids(col_distribute_ids), @@ -41,13 +41,14 @@ class PipScannerContext : public vectorized::ScannerContext { PipScannerContext(RuntimeState* state, ScanLocalStateBase* local_state, const TupleDescriptor* output_tuple_desc, - const std::list>& scanners, - int64_t limit_, int64_t max_bytes_in_blocks_queue, - const std::vector& col_distribute_ids, const int num_parallel_instances, - std::shared_ptr dependency) + const std::list& scanners, int64_t limit_, + int64_t max_bytes_in_blocks_queue, const std::vector& col_distribute_ids, + const int num_parallel_instances, + std::shared_ptr dependency, + std::shared_ptr finish_dependency) : vectorized::ScannerContext(state, output_tuple_desc, scanners, limit_, max_bytes_in_blocks_queue, num_parallel_instances, - local_state, dependency), + local_state, dependency, finish_dependency), _need_colocate_distribute(false) {} Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos, @@ -110,6 +111,9 @@ class PipScannerContext : public vectorized::ScannerContext { return Status::OK(); } + // We should make those method lock free. + bool done() override { return _is_finished || _should_stop; } + void append_blocks_to_queue(std::vector& blocks) override { const int queue_size = _blocks_queues.size(); const int block_size = blocks.size(); diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 908b2a663b7eb08..5ad2dbec5b69fac 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -46,11 +46,11 @@ namespace doris::vectorized { using namespace std::chrono_literals; ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc, - const std::list>& scanners, - int64_t limit_, int64_t max_bytes_in_blocks_queue, - const int num_parallel_instances, + const std::list& scanners, int64_t limit_, + int64_t max_bytes_in_blocks_queue, const int num_parallel_instances, pipeline::ScanLocalStateBase* local_state, - std::shared_ptr dependency) + std::shared_ptr dependency, + std::shared_ptr finish_dependency) : _state(state), _parent(nullptr), _local_state(local_state), @@ -61,10 +61,11 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) * num_parallel_instances), _scanner_scheduler(state->exec_env()->scanner_scheduler()), - _scanners(scanners.begin(), scanners.end()), - _all_scanners(scanners.begin(), scanners.end()), + _scanners(scanners), + _scanners_ref(scanners.begin(), scanners.end()), _num_parallel_instances(num_parallel_instances), - _dependency(dependency) { + _dependency(dependency), + _finish_dependency(finish_dependency) { // Use the task exec context as a lock between scanner threads and fragment exection threads _task_exec_ctx = _state->get_task_execution_context(); _query_id = _state->get_query_ctx()->query_id(); @@ -91,9 +92,8 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VScanNode* parent, const doris::TupleDescriptor* output_tuple_desc, - const std::list>& scanners, - int64_t limit_, int64_t max_bytes_in_blocks_queue, - const int num_parallel_instances, + const std::list& scanners, int64_t limit_, + int64_t max_bytes_in_blocks_queue, const int num_parallel_instances, pipeline::ScanLocalStateBase* local_state) : _state(state), _parent(parent), @@ -105,8 +105,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VS _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) * num_parallel_instances), _scanner_scheduler(state->exec_env()->scanner_scheduler()), - _scanners(scanners.begin(), scanners.end()), - _all_scanners(scanners.begin(), scanners.end()), + _scanners(scanners), + _scanners_ref(scanners.begin(), scanners.end()), _num_parallel_instances(num_parallel_instances) { // Use the task exec context as a lock between scanner threads and fragment exection threads _task_exec_ctx = _state->get_task_execution_context(); @@ -182,6 +182,10 @@ Status ScannerContext::init() { } #endif + // 4. This ctx will be submitted to the scanner scheduler right after init. + // So set _num_scheduling_ctx to 1 here. + _num_scheduling_ctx = 1; + _num_unfinished_scanners = _scanners.size(); if (_parent) { @@ -271,9 +275,11 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo bool is_scheduled = false; if (!done() && to_be_schedule && _num_running_scanners == 0) { is_scheduled = true; - auto submit_status = _scanner_scheduler->submit(shared_from_this()); - if (!submit_status.ok()) { - set_status_on_error(submit_status, false); + auto state = _scanner_scheduler->submit(shared_from_this()); + if (state.ok()) { + _num_scheduling_ctx++; + } else { + set_status_on_error(state, false); } } @@ -364,17 +370,41 @@ Status ScannerContext::validate_block_schema(Block* block) { return Status::OK(); } +void ScannerContext::set_should_stop() { + std::lock_guard l(_transfer_lock); + _should_stop = true; + _set_scanner_done(); + for (const VScannerWPtr& scanner : _scanners_ref) { + if (VScannerSPtr sc = scanner.lock()) { + sc->try_stop(); + } + } + _blocks_queue_added_cv.notify_one(); + set_ready_to_finish(); +} + void ScannerContext::inc_num_running_scanners(int32_t inc) { std::lock_guard l(_transfer_lock); _num_running_scanners += inc; } -void ScannerContext::dec_num_running_scanners(int32_t scanner_dec) { +void ScannerContext::dec_num_scheduling_ctx() { std::lock_guard l(_transfer_lock); - _num_running_scanners -= scanner_dec; + _num_scheduling_ctx--; + set_ready_to_finish(); + if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) { + _ctx_finish_cv.notify_one(); + } +} + +void ScannerContext::set_ready_to_finish() { + // `_should_stop == true` means this task has already ended and wait for pending finish now. + if (_finish_dependency && done() && _num_running_scanners == 0 && _num_scheduling_ctx == 0) { + _finish_dependency->set_ready(); + } } -void ScannerContext::set_status_on_error(const Status& status, bool need_lock) { +bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) { std::unique_lock l(_transfer_lock, std::defer_lock); if (need_lock) { l.lock(); @@ -385,20 +415,14 @@ void ScannerContext::set_status_on_error(const Status& status, bool need_lock) { _blocks_queue_added_cv.notify_one(); _should_stop = true; _set_scanner_done(); + return true; } + return false; } -void ScannerContext::stop_scanners(RuntimeState* state) { - std::unique_lock l(_transfer_lock); - _should_stop = true; - _set_scanner_done(); - for (const std::weak_ptr& scanner : _all_scanners) { - if (std::shared_ptr sc = scanner.lock()) { - sc->_scanner->try_stop(); - } - } - _blocks_queue.clear(); - // TODO yiguolei, call mark close to scanners +template +Status ScannerContext::_close_and_clear_scanners(Parent* parent, RuntimeState* state) { + std::unique_lock l(_scanners_lock); if (state->enable_profile()) { std::stringstream scanner_statistics; std::stringstream scanner_rows_read; @@ -406,38 +430,76 @@ void ScannerContext::stop_scanners(RuntimeState* state) { scanner_statistics << "["; scanner_rows_read << "["; scanner_wait_worker_time << "["; - // Scanners can in 3 state - // state 1: in scanner context, not scheduled - // state 2: in scanner worker pool's queue, scheduled but not running - // state 3: scanner is running. - for (auto& scanner_ref : _all_scanners) { - auto scanner = scanner_ref.lock(); - if (scanner == nullptr) { - continue; - } + for (auto finished_scanner_time : _finished_scanner_runtime) { + scanner_statistics << PrettyPrinter::print(finished_scanner_time, TUnit::TIME_NS) + << ", "; + } + for (auto finished_scanner_rows : _finished_scanner_rows_read) { + scanner_rows_read << PrettyPrinter::print(finished_scanner_rows, TUnit::UNIT) << ", "; + } + for (auto finished_scanner_wait_time : _finished_scanner_wait_worker_time) { + scanner_wait_worker_time + << PrettyPrinter::print(finished_scanner_wait_time, TUnit::TIME_NS) << ", "; + } + // Only unfinished scanners here + for (auto& scanner : _scanners) { + // Scanners are in ObjPool in ScanNode, + // so no need to delete them here. // Add per scanner running time before close them - scanner_statistics << PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(), - TUnit::TIME_NS) + scanner_statistics << PrettyPrinter::print(scanner->get_time_cost_ns(), TUnit::TIME_NS) << ", "; - scanner_rows_read << PrettyPrinter::print(scanner->_scanner->get_rows_read(), - TUnit::UNIT) + scanner_rows_read << PrettyPrinter::print(scanner->get_rows_read(), TUnit::UNIT) << ", "; scanner_wait_worker_time - << PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(), + << PrettyPrinter::print(scanner->get_scanner_wait_worker_timer(), TUnit::TIME_NS) << ", "; - // since there are all scanners, some scanners is running, so that could not call scanner - // close here. } scanner_statistics << "]"; scanner_rows_read << "]"; scanner_wait_worker_time << "]"; - _scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str()); - _scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str()); - _scanner_profile->add_info_string("PerScannerWaitTime", scanner_wait_worker_time.str()); + parent->scanner_profile()->add_info_string("PerScannerRunningTime", + scanner_statistics.str()); + parent->scanner_profile()->add_info_string("PerScannerRowsRead", scanner_rows_read.str()); + parent->scanner_profile()->add_info_string("PerScannerWaitTime", + scanner_wait_worker_time.str()); + } + // Only unfinished scanners here + for (auto& scanner : _scanners) { + static_cast(scanner->close(state)); + // Scanners are in ObjPool in ScanNode, + // so no need to delete them here. } + _scanners.clear(); + return Status::OK(); +} - _blocks_queue_added_cv.notify_one(); +template +void ScannerContext::clear_and_join(Parent* parent, RuntimeState* state) { + std::unique_lock l(_transfer_lock); + do { + if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) { + break; + } else { + DCHECK(!state->enable_pipeline_exec()) + << " _num_running_scanners: " << _num_running_scanners + << " _num_scheduling_ctx: " << _num_scheduling_ctx; + while (!(_num_running_scanners == 0 && _num_scheduling_ctx == 0)) { + _ctx_finish_cv.wait(l); + } + break; + } + } while (false); + // Must wait all running scanners stop running. + // So that we can make sure to close all scanners. + static_cast(_close_and_clear_scanners(parent, state)); + + _blocks_queue.clear(); +} + +bool ScannerContext::no_schedule() { + std::unique_lock l(_transfer_lock); + return _num_running_scanners == 0 && _num_scheduling_ctx == 0; } void ScannerContext::_set_scanner_done() { @@ -450,11 +512,12 @@ std::string ScannerContext::debug_string() { return fmt::format( "id: {}, sacnners: {}, blocks in queue: {}," " status: {}, _should_stop: {}, _is_finished: {}, free blocks: {}," - " limit: {}, _num_running_scanners: {}, _max_thread_num: {}," + " limit: {}, _num_running_scanners: {}, _num_scheduling_ctx: {}, _max_thread_num: {}," " _block_per_scanner: {}, _cur_bytes_in_queue: {}, MAX_BYTE_OF_QUEUE: {}", ctx_id, _scanners.size(), _blocks_queue.size(), status().ok(), _should_stop, - _is_finished, _free_blocks.size_approx(), limit, _num_running_scanners, _max_thread_num, - _block_per_scanner, _cur_bytes_in_queue, _max_bytes_in_queue); + _is_finished, _free_blocks.size_approx(), limit, _num_running_scanners, + _num_scheduling_ctx, _max_thread_num, _block_per_scanner, _cur_bytes_in_queue, + _max_bytes_in_queue); } void ScannerContext::reschedule_scanner_ctx() { @@ -462,67 +525,84 @@ void ScannerContext::reschedule_scanner_ctx() { if (done()) { return; } - auto submit_status = _scanner_scheduler->submit(shared_from_this()); + auto state = _scanner_scheduler->submit(shared_from_this()); //todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times? - if (!submit_status.ok()) { - set_status_on_error(submit_status, false); + if (state.ok()) { + _num_scheduling_ctx++; + } else { + set_status_on_error(state, false); } } -void ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr scanner) { - std::lock_guard l(_transfer_lock); - // Use a transfer lock to avoid the scanner be scheduled concurrently. For example, that after - // calling "_scanners.push_front(scanner)", there may be other ctx in scheduler - // to schedule that scanner right away, and in that schedule run, the scanner may be marked as closed - // before we call the following if() block. - if (scanner->_scanner->need_to_close()) { - --_num_unfinished_scanners; - if (_num_unfinished_scanners == 0) { - _dispose_coloate_blocks_not_in_queue(); - _is_finished = true; - _set_scanner_done(); - _blocks_queue_added_cv.notify_one(); - return; - } +void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { + { + std::unique_lock l(_scanners_lock); + _scanners.push_front(scanner); } + std::lock_guard l(_transfer_lock); - _scanners.push_front(scanner); + // In pipeline engine, doris will close scanners when `no_schedule`. + // We have to decrease _num_running_scanners before schedule, otherwise + // schedule does not woring due to _num_running_scanners. + _num_running_scanners--; + set_ready_to_finish(); - if (should_be_scheduled()) { - auto submit_status = _scanner_scheduler->submit(shared_from_this()); - if (!submit_status.ok()) { - set_status_on_error(submit_status, false); + if (!done() && should_be_scheduled()) { + auto state = _scanner_scheduler->submit(shared_from_this()); + if (state.ok()) { + _num_scheduling_ctx++; + } else { + set_status_on_error(state, false); } } + + // Notice that after calling "_scanners.push_front(scanner)", there may be other ctx in scheduler + // to schedule that scanner right away, and in that schedule run, the scanner may be marked as closed + // before we call the following if() block. + // So we need "scanner->set_counted_down()" to avoid "_num_unfinished_scanners" being decreased twice by + // same scanner. + if (scanner->need_to_close() && scanner->set_counted_down() && + (--_num_unfinished_scanners) == 0) { + _dispose_coloate_blocks_not_in_queue(); + _is_finished = true; + _set_scanner_done(); + _blocks_queue_added_cv.notify_one(); + } + _ctx_finish_cv.notify_one(); } -// This method is called in scanner scheduler, and task context is hold -void ScannerContext::get_next_batch_of_scanners( - std::list>* current_run) { - std::lock_guard l(_transfer_lock); - // Update the sched counter for profile - Defer defer {[&]() { _scanner_sched_counter->update(current_run->size()); }}; +void ScannerContext::get_next_batch_of_scanners(std::list* current_run) { // 1. Calculate how many scanners should be scheduled at this run. - // If there are enough space in blocks queue, - // the scanner number depends on the _free_blocks numbers - int thread_slot_num = get_available_thread_slot_num(); + int thread_slot_num = 0; + { + // If there are enough space in blocks queue, + // the scanner number depends on the _free_blocks numbers + thread_slot_num = get_available_thread_slot_num(); + } // 2. get #thread_slot_num scanners from ctx->scanners // and put them into "this_run". - for (int i = 0; i < thread_slot_num && !_scanners.empty();) { - std::weak_ptr scanner_ref = _scanners.front(); - std::shared_ptr scanner = scanner_ref.lock(); - _scanners.pop_front(); - if (scanner == nullptr) { - continue; - } - if (scanner->_scanner->need_to_close()) { - static_cast(scanner->_scanner->close(_state)); - } else { - current_run->push_back(scanner_ref); - i++; + { + std::unique_lock l(_scanners_lock); + for (int i = 0; i < thread_slot_num && !_scanners.empty();) { + VScannerSPtr scanner = _scanners.front(); + _scanners.pop_front(); + if (scanner->need_to_close()) { + _finished_scanner_runtime.push_back(scanner->get_time_cost_ns()); + _finished_scanner_rows_read.push_back(scanner->get_rows_read()); + _finished_scanner_wait_worker_time.push_back( + scanner->get_scanner_wait_worker_timer()); + static_cast(scanner->close(_state)); + } else { + current_run->push_back(scanner); + i++; + } } } } +template void ScannerContext::clear_and_join(pipeline::ScanLocalStateBase* parent, + RuntimeState* state); +template void ScannerContext::clear_and_join(VScanNode* parent, RuntimeState* state); + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index e320eb55b2e19c8..ba9c1fdee10a5bd 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -53,7 +53,6 @@ class TaskGroup; namespace vectorized { class VScanner; -class ScannerDelegate; class VScanNode; class ScannerScheduler; class SimplifiedScanScheduler; @@ -71,7 +70,7 @@ class ScannerContext : public std::enable_shared_from_this { public: ScannerContext(RuntimeState* state, VScanNode* parent, const TupleDescriptor* output_tuple_desc, - const std::list>& scanners, int64_t limit_, + const std::list& scanners, int64_t limit_, int64_t max_bytes_in_blocks_queue, const int num_parallel_instances = 1, pipeline::ScanLocalStateBase* local_state = nullptr); @@ -93,9 +92,9 @@ class ScannerContext : public std::enable_shared_from_this { // When a scanner complete a scan, this method will be called // to return the scanner to the list for next scheduling. - void push_back_scanner_and_reschedule(std::shared_ptr scanner); + void push_back_scanner_and_reschedule(VScannerSPtr scanner); - void set_status_on_error(const Status& status, bool need_lock = true); + bool set_status_on_error(const Status& status, bool need_lock = true); Status status() { if (_process_status.is()) { @@ -104,21 +103,34 @@ class ScannerContext : public std::enable_shared_from_this { return _process_status; } + // Called by ScanNode. + // Used to notify the scheduler that this ScannerContext can stop working. + void set_should_stop(); + // Return true if this ScannerContext need no more process - bool done() const { return _is_finished || _should_stop; } + virtual bool done() { return _is_finished || _should_stop; } bool is_finished() { return _is_finished.load(); } bool should_stop() { return _should_stop.load(); } bool status_error() { return _status_error.load(); } void inc_num_running_scanners(int32_t scanner_inc); - void dec_num_running_scanners(int32_t scanner_dec); + void set_ready_to_finish(); int get_num_running_scanners() const { return _num_running_scanners; } int get_num_unfinished_scanners() const { return _num_unfinished_scanners; } - void get_next_batch_of_scanners(std::list>* current_run); + void dec_num_scheduling_ctx(); + + int get_num_scheduling_ctx() const { return _num_scheduling_ctx; } + + void get_next_batch_of_scanners(std::list* current_run); + + template + void clear_and_join(Parent* parent, RuntimeState* state); + + bool no_schedule(); virtual std::string debug_string(); @@ -126,6 +138,7 @@ class ScannerContext : public std::enable_shared_from_this { void incr_num_ctx_scheduling(int64_t num) { _scanner_ctx_sched_counter->update(num); } void incr_ctx_scheduling_time(int64_t num) { _scanner_ctx_sched_time->update(num); } + void incr_num_scanner_scheduling(int64_t num) { _scanner_sched_counter->update(num); } std::string parent_name(); @@ -133,7 +146,7 @@ class ScannerContext : public std::enable_shared_from_this { // todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when executing shared scan inline bool should_be_scheduled() const { - return !done() && (_cur_bytes_in_queue < _max_bytes_in_queue / 2) && + return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) && (_serving_blocks_num < allowed_blocks_num()); } @@ -156,8 +169,6 @@ class ScannerContext : public std::enable_shared_from_this { SimplifiedScanScheduler* get_simple_scan_scheduler() { return _simple_scan_scheduler; } - void stop_scanners(RuntimeState* state); - void reschedule_scanner_ctx(); // the unique id of this context @@ -170,12 +181,17 @@ class ScannerContext : public std::enable_shared_from_this { std::weak_ptr get_task_execution_context() { return _task_exec_ctx; } +private: + template + Status _close_and_clear_scanners(Parent* parent, RuntimeState* state); + protected: ScannerContext(RuntimeState* state_, const TupleDescriptor* output_tuple_desc, - const std::list>& scanners_, int64_t limit_, + const std::list& scanners_, int64_t limit_, int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances, pipeline::ScanLocalStateBase* local_state, - std::shared_ptr dependency); + std::shared_ptr dependency, + std::shared_ptr finish_dependency); virtual void _dispose_coloate_blocks_not_in_queue() {} void _set_scanner_done(); @@ -259,11 +275,9 @@ class ScannerContext : public std::enable_shared_from_this { // and then if the scanner is not finished, will be pushed back to this list. // Not need to protect by lock, because only one scheduler thread will access to it. std::mutex _scanners_lock; - // Scanner's ownership belong to vscannode or scanoperator, scanner context does not own it. - // ScannerContext has to check if scanner is deconstructed before use it. - std::list> _scanners; + std::list _scanners; // weak pointer for _scanners, used in stop function - std::vector> _all_scanners; + std::vector _scanners_ref; std::vector _finished_scanner_runtime; std::vector _finished_scanner_rows_read; std::vector _finished_scanner_wait_worker_time; @@ -280,6 +294,7 @@ class ScannerContext : public std::enable_shared_from_this { RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr; std::shared_ptr _dependency = nullptr; + std::shared_ptr _finish_dependency = nullptr; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index a67b9d7f27ab58a..e8d7f8a7139a6d3 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -180,14 +180,20 @@ void ScannerScheduler::_schedule_scanners(std::shared_ptr ctx) { watch.reset(); watch.start(); ctx->incr_num_ctx_scheduling(1); + size_t size = 0; + Defer defer {[&]() { + ctx->incr_num_scanner_scheduling(size); + ctx->dec_num_scheduling_ctx(); + }}; if (ctx->done()) { return; } - std::list> this_run; + std::list this_run; ctx->get_next_batch_of_scanners(&this_run); - if (this_run.empty()) { + size = this_run.size(); + if (!size) { // There will be 2 cases when this_run is empty: // 1. The blocks queue reaches limit. // The consumer will continue scheduling the ctx. @@ -206,14 +212,9 @@ void ScannerScheduler::_schedule_scanners(std::shared_ptr ctx) { if (ctx->thread_token != nullptr) { // TODO llj tg how to treat this? while (iter != this_run.end()) { - std::shared_ptr scanner_delegate = (*iter).lock(); - if (scanner_delegate == nullptr) { - continue; - } - scanner_delegate->_scanner->start_wait_worker_timer(); - auto s = ctx->thread_token->submit_func([this, scanner_ref = *iter, ctx]() { - this->_scanner_scan(this, ctx, scanner_ref); - }); + (*iter)->start_wait_worker_timer(); + auto s = ctx->thread_token->submit_func( + [this, scanner = *iter, ctx] { this->_scanner_scan(this, ctx, scanner); }); if (s.ok()) { this_run.erase(iter++); } else { @@ -223,32 +224,28 @@ void ScannerScheduler::_schedule_scanners(std::shared_ptr ctx) { } } else { while (iter != this_run.end()) { - std::shared_ptr scanner_delegate = (*iter).lock(); - if (scanner_delegate == nullptr) { - continue; - } - scanner_delegate->_scanner->start_wait_worker_timer(); - TabletStorageType type = scanner_delegate->_scanner->get_storage_type(); + (*iter)->start_wait_worker_timer(); + TabletStorageType type = (*iter)->get_storage_type(); bool ret = false; if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { if (auto* scan_sche = ctx->get_simple_scan_scheduler()) { - auto work_func = [this, scanner_ref = *iter, ctx]() { - this->_scanner_scan(this, ctx, scanner_ref); + auto work_func = [this, scanner = *iter, ctx] { + this->_scanner_scan(this, ctx, scanner); }; SimplifiedScanTask simple_scan_task = {work_func, ctx}; ret = scan_sche->get_scan_queue()->try_put(simple_scan_task); } else { PriorityThreadPool::Task task; - task.work_function = [this, scanner_ref = *iter, ctx]() { - this->_scanner_scan(this, ctx, scanner_ref); + task.work_function = [this, scanner = *iter, ctx] { + this->_scanner_scan(this, ctx, scanner); }; task.priority = nice; ret = _local_scan_thread_pool->offer(task); } } else { PriorityThreadPool::Task task; - task.work_function = [this, scanner_ref = *iter, ctx]() { - this->_scanner_scan(this, ctx, scanner_ref); + task.work_function = [this, scanner = *iter, ctx] { + this->_scanner_scan(this, ctx, scanner); }; task.priority = nice; ret = _remote_scan_thread_pool->offer(task); @@ -266,22 +263,13 @@ void ScannerScheduler::_schedule_scanners(std::shared_ptr ctx) { } void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, - std::shared_ptr ctx, - std::weak_ptr scanner_ref) { - Defer defer {[&]() { ctx->dec_num_running_scanners(1); }}; + std::shared_ptr ctx, VScannerSPtr scanner) { auto task_lock = ctx->get_task_execution_context().lock(); if (task_lock == nullptr) { // LOG(WARNING) << "could not lock task execution context, query " << print_id(_query_id) // << " maybe finished"; return; } - // will release scanner if it is the last one, task lock is hold here, to ensure - // that scanner could call scannode's method during deconstructor - std::shared_ptr scanner_delegate = scanner_ref.lock(); - auto& scanner = scanner_delegate->_scanner; - if (scanner_delegate == nullptr) { - return; - } SCOPED_ATTACH_TASK(scanner->runtime_state()); // for cpu hard limit, thread name should not be reset if (ctx->_should_reset_thread_name) { @@ -412,7 +400,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, if (eos || should_stop) { scanner->mark_to_need_to_close(); } - ctx->push_back_scanner_and_reschedule(scanner_delegate); + ctx->push_back_scanner_and_reschedule(scanner); } void ScannerScheduler::_register_metrics() { diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 9fedd27dbd8bb3c..eb4d1380e3947cc 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -36,7 +36,7 @@ class BlockingQueue; } // namespace doris namespace doris::vectorized { -class ScannerDelegate; + class ScannerContext; // Responsible for the scheduling and execution of all Scanners of a BE node. @@ -79,7 +79,7 @@ class ScannerScheduler { void _schedule_scanners(std::shared_ptr ctx); // execution thread function void _scanner_scan(ScannerScheduler* scheduler, std::shared_ptr ctx, - std::weak_ptr scanner); + VScannerSPtr scanner); void _register_metrics(); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index b780fc1a8a97f02..5176d7900b3c7ed 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -273,7 +273,7 @@ Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* reached_limit(block, eos); if (*eos) { // reach limit, stop the scanners. - _scanner_ctx->stop_scanners(state); + _scanner_ctx->set_should_stop(); } return Status::OK(); @@ -318,8 +318,8 @@ Status VScanNode::_init_profile() { return Status::OK(); } -void VScanNode::_start_scanners(const std::list>& scanners, - const int query_parallel_instance_num) { +Status VScanNode::_start_scanners(const std::list& scanners, + const int query_parallel_instance_num) { if (_is_pipeline_scan) { int max_queue_size = _shared_scan_opt ? std::max(query_parallel_instance_num, 1) : 1; _scanner_ctx = pipeline::PipScannerContext::create_shared( @@ -329,29 +329,41 @@ void VScanNode::_start_scanners(const std::list _scanner_ctx = ScannerContext::create_shared(_state, this, _output_tuple_desc, scanners, limit(), _state->scan_queue_mem_limit()); } + return Status::OK(); } Status VScanNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); } - RETURN_IF_ERROR(ExecNode::close(state)); return Status::OK(); } void VScanNode::release_resource(RuntimeState* state) { if (_scanner_ctx) { - if (!state->enable_pipeline_exec() || _should_create_scanner) { + if (!state->enable_pipeline_exec()) { // stop and wait the scanner scheduler to be done // _scanner_ctx may not be created for some short circuit case. - _scanner_ctx->stop_scanners(state); + _scanner_ctx->set_should_stop(); + _scanner_ctx->clear_and_join(this, state); + } else if (_should_create_scanner) { + _scanner_ctx->clear_and_join(this, state); } } - _scanners.clear(); + ExecNode::release_resource(state); } +Status VScanNode::try_close(RuntimeState* state) { + if (_scanner_ctx) { + // mark this scanner ctx as should_stop to make sure scanners will not be scheduled anymore + // TODO: there is a lock in `set_should_stop` may cause some slight impact + _scanner_ctx->set_should_stop(); + } + return Status::OK(); +} + Status VScanNode::_normalize_conjuncts() { // The conjuncts is always on output tuple, so use _output_tuple_desc; std::vector slots = _output_tuple_desc->slots(); @@ -1317,15 +1329,11 @@ VScanNode::PushDownType VScanNode::_should_push_down_in_predicate(VInPredicate* Status VScanNode::_prepare_scanners(const int query_parallel_instance_num) { std::list scanners; RETURN_IF_ERROR(_init_scanners(&scanners)); - // Init scanner wrapper - for (auto it = scanners.begin(); it != scanners.end(); ++it) { - _scanners.emplace_back(std::make_shared(*it)); - } if (scanners.empty()) { _eos = true; } else { COUNTER_SET(_num_scanners, static_cast(scanners.size())); - _start_scanners(_scanners, query_parallel_instance_num); + RETURN_IF_ERROR(_start_scanners(scanners, query_parallel_instance_num)); } return Status::OK(); } diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index d4a054cacd5a829..5917d0ff46b5eb5 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -87,16 +87,6 @@ struct FilterPredicates { std::vector>> in_filters; }; -// We want to close scanner automatically, so using a delegate class -// and call close method in the delegate class's dctor. -class ScannerDelegate { -public: - VScannerSPtr _scanner; - ScannerDelegate(VScannerSPtr& scanner_ptr) : _scanner(scanner_ptr) {} - ~ScannerDelegate() { static_cast(_scanner->close(_scanner->runtime_state())); } - ScannerDelegate(ScannerDelegate&&) = delete; -}; - class VScanNode : public ExecNode, public RuntimeFilterConsumer { public: VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) @@ -166,6 +156,8 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { Status alloc_resource(RuntimeState* state) override; void release_resource(RuntimeState* state) override; + Status try_close(RuntimeState* state); + bool should_run_serial() const { return _should_run_serial || _state->enable_scan_node_run_serial(); } @@ -270,11 +262,8 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { int _max_scan_key_num; int _max_pushdown_conditions_per_column; - // ScanNode owns the ownership of scanner, scanner context only has its weakptr - std::list> _scanners; - - // Each scan node will generates a ScannerContext to do schedule work - // ScannerContext will be added to scanner scheduler + // Each scan node will generates a ScannerContext to manage all Scanners. + // See comments of ScannerContext for more details std::shared_ptr _scanner_ctx = nullptr; // indicate this scan node has no more data to return @@ -448,8 +437,8 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { const std::string& fn_name, int slot_ref_child = -1); // Submit the scanner to the thread pool and start execution - void _start_scanners(const std::list>& scanners, - const int query_parallel_instance_num); + Status _start_scanners(const std::list& scanners, + const int query_parallel_instance_num); }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 6046d87ac91ef55..29daf9a68c557f6 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -145,6 +145,16 @@ class VScanner { void set_status_on_failure(const Status& st) { _status = st; } + // return false if _is_counted_down is already true, + // otherwise, set _is_counted_down to true and return true. + bool set_counted_down() { + if (_is_counted_down) { + return false; + } + _is_counted_down = true; + return true; + } + protected: void _discard_conjuncts() { for (auto& conjunct : _conjuncts) { @@ -205,6 +215,8 @@ class VScanner { int64_t _scan_cpu_timer = 0; bool _is_load = false; + // set to true after decrease the "_num_unfinished_scanners" in scanner context + bool _is_counted_down = false; bool _is_init = true; @@ -215,5 +227,6 @@ class VScanner { }; using VScannerSPtr = std::shared_ptr; +using VScannerWPtr = std::weak_ptr; } // namespace doris::vectorized