Skip to content

Commit

Permalink
[opt](scan) merge small tablets into one scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg committed Aug 29, 2024
1 parent 63b8949 commit 8d5b485
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 10 deletions.
50 changes: 46 additions & 4 deletions be/src/olap/parallel_scanner_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "olap/rowset/beta_rowset.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "vec/exec/scan/new_olap_scanner.h"
#include "vec/exec/scan/union_scanner.h"

namespace doris {

Expand All @@ -41,6 +42,8 @@ Status ParallelScannerBuilder::build_scanners(std::list<VScannerSPtr>& scanners)
Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>& scanners) {
DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);

std::vector<std::shared_ptr<NewOlapScanner>> small_scanners;
std::vector<size_t> small_scanners_rows;
for (auto&& [tablet, version] : _tablets) {
DCHECK(_all_rowsets.contains(tablet->tablet_id()));
auto& rowsets = _all_rowsets[tablet->tablet_id()];
Expand Down Expand Up @@ -156,10 +159,49 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
split.segment_offsets.second - split.segment_offsets.first);
}
#endif
scanners.emplace_back(
_build_scanner(tablet, version, _key_ranges,
{std::move(read_source.rs_splits),
reade_source_with_delete_info.delete_predicates}));
auto scanner = _build_scanner(tablet, version, _key_ranges,
{std::move(read_source.rs_splits),
reade_source_with_delete_info.delete_predicates});
if (rows_collected < _rows_per_scanner * 0.5) {
small_scanners.emplace_back(std::move(scanner));
small_scanners_rows.emplace_back(rows_collected);
} else {
scanners.emplace_back(std::move(scanner));
}
}
}

if (!small_scanners.empty()) {
if ((scanners.size() + small_scanners.size()) >= _max_scanners_count) {
auto union_scanner =
UnionScanner::create_shared(_state, _parent, _limit, _scanner_profile.get());
size_t rows_collected = 0;
for (size_t i = 0; i != small_scanners.size(); ++i) {
union_scanner->add_scanner(std::move(small_scanners[i]));
rows_collected += small_scanners_rows[i];

if (rows_collected >= _rows_per_scanner) {
LOG(INFO) << "merge " << union_scanner->get_scanners_count()
<< " tablets' readers into one, contain rows: " << rows_collected;
rows_collected = 0;
scanners.emplace_back(std::move(union_scanner));
union_scanner = UnionScanner::create_shared(_state, _parent, _limit,
_scanner_profile.get());
}
}

const auto count = union_scanner->get_scanners_count();
if (count > 1) {
LOG(INFO) << "merge " << count
<< " tablets' readers into one, contain rows: " << rows_collected;
scanners.emplace_back(std::move(union_scanner));
} else if (count == 1) {
scanners.emplace_back(union_scanner->get_single_scanner());
}
} else {
for (auto& scanner : small_scanners) {
scanners.emplace_back(std::move(scanner));
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/rowset_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct RowSetSplits {
std::vector<RowRanges> segment_row_ranges;

RowSetSplits(RowsetReaderSharedPtr rs_reader_)
: rs_reader(rs_reader_), segment_offsets({0, 0}) {}
: rs_reader(std::move(rs_reader_)), segment_offsets({0, 0}) {}
RowSetSplits() = default;
};

Expand Down
5 changes: 2 additions & 3 deletions be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,8 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s

RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners));
for (auto& scanner : *scanners) {
auto* olap_scanner = assert_cast<vectorized::NewOlapScanner*>(scanner.get());
RETURN_IF_ERROR(olap_scanner->prepare(state(), _conjuncts));
olap_scanner->set_compound_filters(_compound_filters);
RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts));
scanner->set_compound_filters(_compound_filters);
}
return Status::OK();
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/exec/scan/new_olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#pragma once

#include <gen_cpp/PaloInternalService_types.h>
#include <stdint.h>

#include <memory>
#include <string>
Expand Down Expand Up @@ -51,6 +50,7 @@ struct FilterPredicates;
namespace vectorized {

class Block;
class UnionScanner;

class NewOlapScanner : public VScanner {
ENABLE_FACTORY_CREATOR(NewOlapScanner);
Expand All @@ -75,11 +75,12 @@ class NewOlapScanner : public VScanner {

Status close(RuntimeState* state) override;

void set_compound_filters(const std::vector<TCondition>& compound_filters);
void set_compound_filters(const std::vector<TCondition>& compound_filters) override;

doris::TabletStorageType get_storage_type() override;

protected:
friend class UnionScanner;
Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
void _collect_profile_before_close() override;

Expand Down
93 changes: 93 additions & 0 deletions be/src/vec/exec/scan/union_scanner.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "vec/exec/scan/union_scanner.h"

namespace doris::vectorized {

Status UnionScanner::init() {
for (auto& scanner : _olap_scanners) {
RETURN_IF_ERROR(scanner->init());
}
_is_init = true;
return Status::OK();
}

Status UnionScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
RETURN_IF_ERROR(VScanner::prepare(state, conjuncts));
for (auto& scanner : _olap_scanners) {
RETURN_IF_ERROR(scanner->prepare(state, conjuncts));
}
return Status::OK();
}

Status UnionScanner::open(RuntimeState* state) {
RETURN_IF_ERROR(_olap_scanners[0]->open(state));
return Status::OK();
}

Status UnionScanner::close(RuntimeState* state) {
if (_scanner_cursor != _olap_scanners.size()) {
RETURN_IF_ERROR(_olap_scanners[_scanner_cursor]->close(state));
}

return Status::OK();
}

void UnionScanner::set_compound_filters(const std::vector<TCondition>& compound_filters) {
for (auto& scanner : _olap_scanners) {
scanner->set_compound_filters(compound_filters);
}
}

TabletStorageType UnionScanner::get_storage_type() {
return _olap_scanners[0]->get_storage_type();
}

Status UnionScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eos) {
if (_scanner_cursor == _olap_scanners.size()) [[unlikely]] {
*eos = true;
return Status::OK();
}

do {
auto& scanner = _olap_scanners[_scanner_cursor];
bool inner_eos = false;
*eos = false;
RETURN_IF_ERROR(scanner->get_block(state, block, &inner_eos));

if (inner_eos) {
scanner->_collect_profile_before_close();
RETURN_IF_ERROR(scanner->close(state));
_scanner_cursor++;
if (_scanner_cursor != _olap_scanners.size()) {
RETURN_IF_ERROR(_olap_scanners[_scanner_cursor]->open(state));
} else {
break;
}
}
} while (block->empty());
return Status::OK();
}

void UnionScanner::_collect_profile_before_close() {
if (_scanner_cursor != _olap_scanners.size()) {
_olap_scanners[_scanner_cursor]->_collect_profile_before_close();
}
}

} // namespace doris::vectorized
78 changes: 78 additions & 0 deletions be/src/vec/exec/scan/union_scanner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "common/factory_creator.h"
#include "common/status.h"
#include "vec/exec/scan/new_olap_scanner.h"
#include "vec/exec/scan/vscanner.h"

namespace doris::vectorized {

/**
`UnionScanner` contains more than 1 `NewOlapScanner`
*/
class UnionScanner : public VScanner {
ENABLE_FACTORY_CREATOR(UnionScanner)
public:
UnionScanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state, int64_t limit,
RuntimeProfile* profile)
: VScanner(state, local_state, limit, profile) {
_is_init = false;
}

void add_scanner(std::shared_ptr<NewOlapScanner> scanners) {
_olap_scanners.emplace_back(std::move(scanners));
}

[[nodiscard]] std::shared_ptr<NewOlapScanner> get_single_scanner() {
DCHECK_EQ(_olap_scanners.size(), 1);
return std::move(_olap_scanners[0]);
}

[[nodiscard]] size_t get_scanners_count() const { return _olap_scanners.size(); }

Status init() override;

Status prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts) override;

Status open(RuntimeState* state) override;

Status close(RuntimeState* state) override;

void set_compound_filters(const std::vector<TCondition>& compound_filters) override;

doris::TabletStorageType get_storage_type() override;

std::string get_name() override { return "UnionScanner"; }

protected:
Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
void _collect_profile_before_close() override;

private:
size_t _scanner_cursor {0};
std::vector<std::shared_ptr<NewOlapScanner>> _olap_scanners;
};

} // namespace doris::vectorized
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vscanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class VScanner {

virtual std::string get_name() { return ""; }

virtual void set_compound_filters(const std::vector<TCondition>& compound_filters) {}

// return the readable name of current scan range.
// eg, for file scanner, return the current file path.
virtual std::string get_current_scan_range_name() { return "not implemented"; }
Expand Down

0 comments on commit 8d5b485

Please sign in to comment.