diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 3af8aa1b3a4bb1..2f21b1626a5e88 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1224,7 +1224,8 @@ Status ScanLocalState::_start_scanners( auto& p = _parent->cast(); _scanner_ctx = PipXScannerContext::create_shared( state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), - state()->scan_queue_mem_limit(), _scan_dependency); + state()->scan_queue_mem_limit(), _scan_dependency, + p.ignore_data_distribution() ? 1 : state()->query_parallel_instance_num()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index e33523cf3bf710..e93e9956ff4096 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -47,9 +47,11 @@ class PipXScannerContext final : public vectorized::ScannerContext { const RowDescriptor* output_row_descriptor, const std::list>& scanners, int64_t limit_, int64_t max_bytes_in_blocks_queue, - std::shared_ptr dependency) + std::shared_ptr dependency, + const int num_parallel_instances) : vectorized::ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, - limit_, max_bytes_in_blocks_queue, 1, local_state) { + limit_, max_bytes_in_blocks_queue, num_parallel_instances, + local_state) { _dependency = dependency; } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 2ed0df998b49f4..cf1dd24641d4fb 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -78,7 +78,6 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu : config::doris_scanner_thread_pool_thread_num / (_local_state ? num_parallel_instances : state->query_parallel_instance_num()); - _max_thread_num *= num_parallel_instances; _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num; _max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size()); // 1. Calculate max concurrency