diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index a9769e7b679287..ce32cc5cf39189 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -532,8 +532,9 @@ class MutableBlock { } else { if (_columns.size() != block.columns()) { return Status::Error( - "Merge block not match, self:[{}], input:[{}], ", dump_types(), - block.dump_types()); + "Merge block not match, self:[columns: {}, types: {}], input:[columns: {}, " + "types: {}], ", + dump_names(), dump_types(), block.dump_names(), block.dump_types()); } for (int i = 0; i < _columns.size(); ++i) { if (!_data_types[i]->equals(*block.get_by_position(i).type)) { diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index d81e327b8862c3..571df35e55ed85 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -275,7 +275,11 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, scan_task->cached_blocks.back()->rows() + free_block->rows() <= ctx->batch_size()) { size_t block_size = scan_task->cached_blocks.back()->allocated_bytes(); vectorized::MutableBlock mutable_block(scan_task->cached_blocks.back().get()); - static_cast(mutable_block.merge(*free_block)); + status = mutable_block.merge(*free_block); + if (!status.ok()) { + LOG(WARNING) << "Block merge failed: " << status.to_string(); + break; + } scan_task->cached_blocks.back().get()->set_columns( std::move(mutable_block.mutable_columns())); ctx->return_free_block(std::move(free_block)); diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 39a9059d1d37c8..7354b9e085fd85 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -22,6 +22,7 @@ #include "common/config.h" #include "pipeline/exec/scan_operator.h" #include "runtime/descriptors.h" +#include "util/defer_op.h" #include "util/runtime_profile.h" #include "vec/core/column_with_type_and_name.h" #include "vec/exec/scan/vscan_node.h" @@ -152,6 +153,14 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { } Status VScanner::_filter_output_block(Block* block) { + Defer clear_tmp_block([&]() { + auto all_column_names = block->get_names(); + for (auto& name : all_column_names) { + if (name.rfind(BeConsts::BLOCK_TEMP_COLUMN_PREFIX, 0) == 0) { + block->erase(name); + } + } + }); if (block->has(BeConsts::BLOCK_TEMP_COLUMN_SCANNER_FILTERED)) { // scanner filter_block is already done (only by _topn_next currently), just skip it return Status::OK(); @@ -159,12 +168,6 @@ Status VScanner::_filter_output_block(Block* block) { auto old_rows = block->rows(); Status st = VExprContext::filter_block(_conjuncts, block, block->columns()); _counter.num_rows_unselected += old_rows - block->rows(); - auto all_column_names = block->get_names(); - for (auto& name : all_column_names) { - if (name.rfind(BeConsts::BLOCK_TEMP_COLUMN_PREFIX, 0) == 0) { - block->erase(name); - } - } return st; }