Skip to content

Commit

Permalink
[scan](status) Finish execution if scanner failed (#32966)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored and Doris-Extras committed Mar 29, 2024
1 parent bea05da commit 6600e92
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 9 deletions.
5 changes: 3 additions & 2 deletions be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,9 @@ class MutableBlock {
} else {
if (_columns.size() != block.columns()) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"Merge block not match, self:[{}], input:[{}], ", dump_types(),
block.dump_types());
"Merge block not match, self:[columns: {}, types: {}], input:[columns: {}, "
"types: {}], ",
dump_names(), dump_types(), block.dump_names(), block.dump_types());
}
for (int i = 0; i < _columns.size(); ++i) {
if (!_data_types[i]->equals(*block.get_by_position(i).type)) {
Expand Down
6 changes: 5 additions & 1 deletion be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,11 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
scan_task->cached_blocks.back()->rows() + free_block->rows() <= ctx->batch_size()) {
size_t block_size = scan_task->cached_blocks.back()->allocated_bytes();
vectorized::MutableBlock mutable_block(scan_task->cached_blocks.back().get());
static_cast<void>(mutable_block.merge(*free_block));
status = mutable_block.merge(*free_block);
if (!status.ok()) {
LOG(WARNING) << "Block merge failed: " << status.to_string();
break;
}
scan_task->cached_blocks.back().get()->set_columns(
std::move(mutable_block.mutable_columns()));
ctx->return_free_block(std::move(free_block));
Expand Down
15 changes: 9 additions & 6 deletions be/src/vec/exec/scan/vscanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "common/config.h"
#include "pipeline/exec/scan_operator.h"
#include "runtime/descriptors.h"
#include "util/defer_op.h"
#include "util/runtime_profile.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/exec/scan/vscan_node.h"
Expand Down Expand Up @@ -152,19 +153,21 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
}

Status VScanner::_filter_output_block(Block* block) {
Defer clear_tmp_block([&]() {
auto all_column_names = block->get_names();
for (auto& name : all_column_names) {
if (name.rfind(BeConsts::BLOCK_TEMP_COLUMN_PREFIX, 0) == 0) {
block->erase(name);
}
}
});
if (block->has(BeConsts::BLOCK_TEMP_COLUMN_SCANNER_FILTERED)) {
// scanner filter_block is already done (only by _topn_next currently), just skip it
return Status::OK();
}
auto old_rows = block->rows();
Status st = VExprContext::filter_block(_conjuncts, block, block->columns());
_counter.num_rows_unselected += old_rows - block->rows();
auto all_column_names = block->get_names();
for (auto& name : all_column_names) {
if (name.rfind(BeConsts::BLOCK_TEMP_COLUMN_PREFIX, 0) == 0) {
block->erase(name);
}
}
return st;
}

Expand Down

0 comments on commit 6600e92

Please sign in to comment.