Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
BiteTheDDDDt committed Aug 30, 2024
1 parent 03da821 commit 0afe5a9
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 25 deletions.
31 changes: 11 additions & 20 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ Status ColumnReader::create_agg_state(const ColumnReaderOptions& opts, const Col

auto data_type = vectorized::DataTypeFactory::instance().create_data_type(meta);
const auto* agg_state_type = assert_cast<const vectorized::DataTypeAggState*>(data_type.get());
agg_state_type->check_agg_state_compatibility(opts.be_exec_version);
auto type = agg_state_type->get_serialized_type()->get_type_as_type_descriptor().type;

if (read_as_string(type)) {
Expand Down Expand Up @@ -250,14 +251,12 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ColumnMetaPB&
}

ColumnReader::ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
uint64_t num_rows, io::FileReaderSPtr file_reader,
vectorized::DataTypePtr agg_state_ptr)
uint64_t num_rows, io::FileReaderSPtr file_reader)
: _use_index_page_cache(!config::disable_storage_page_cache),
_opts(opts),
_num_rows(num_rows),
_file_reader(std::move(file_reader)),
_dict_encoding_type(UNKNOWN_DICT_ENCODING),
_agg_state_ptr(std::move(agg_state_ptr)) {
_dict_encoding_type(UNKNOWN_DICT_ENCODING) {
_meta_length = meta.length();
_meta_type = (FieldType)meta.type();
if (_meta_type == FieldType::OLAP_FIELD_TYPE_ARRAY) {
Expand All @@ -278,13 +277,18 @@ ColumnReader::~ColumnReader() {

Status ColumnReader::init(const ColumnMetaPB* meta) {
_type_info = get_type_info(meta);

if (meta->has_be_exec_version()) {
_be_exec_version = meta->be_exec_version();
}

if (_type_info == nullptr) {
return Status::NotSupported("unsupported typeinfo, type={}", meta->type());
}
RETURN_IF_ERROR(EncodingInfo::get(_type_info.get(), meta->encoding(), &_encoding_info));

for (int i = 0; i < meta->indexes_size(); i++) {
auto& index_meta = meta->indexes(i);
const auto& index_meta = meta->indexes(i);
switch (index_meta.type()) {
case ORDINAL_INDEX:
_ordinal_index.reset(
Expand Down Expand Up @@ -726,21 +730,8 @@ Status ColumnReader::new_iterator(ColumnIterator** iterator) {
}

Status ColumnReader::new_agg_state_iterator(ColumnIterator** iterator) {
if (!_agg_state_ptr) { // meet old version ColumnMetaPB
*iterator = new FileColumnIterator(this);
return Status::OK();
}

const auto* agg_state_type =
assert_cast<const vectorized::DataTypeAggState*>(_agg_state_ptr.get());
auto type = agg_state_type->get_serialized_type()->get_type_as_type_descriptor().type;

if (read_as_string(type)) {
*iterator = new FileColumnIterator(this);
return Status::OK();
}

return Status::InternalError("Not supported");
*iterator = new FileColumnIterator(this);
return Status::OK();
}

Status ColumnReader::new_array_iterator(ColumnIterator** iterator) {
Expand Down
13 changes: 10 additions & 3 deletions be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ struct ColumnReaderOptions {
bool verify_checksum = true;
// for in memory olap table, use DURABLE CachePriority in page cache
bool kept_in_memory = false;

int be_exec_version = -1;
};

struct ColumnIteratorOptions {
Expand Down Expand Up @@ -206,7 +208,7 @@ class ColumnReader {

private:
ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows,
io::FileReaderSPtr file_reader, vectorized::DataTypePtr agg_state_ptr = nullptr);
io::FileReaderSPtr file_reader);
Status init(const ColumnMetaPB* meta);

// Read column inverted indexes into memory
Expand Down Expand Up @@ -242,12 +244,19 @@ class ColumnReader {

Status _calculate_row_ranges(const std::vector<uint32_t>& page_indexes, RowRanges* row_ranges);

Status check_be_exec_version(int read_be_exec_version) {
if (_meta_type == FieldType::OLAP_FIELD_TYPE_AGG_STATE) {
}
return Status::OK();
}

private:
int64_t _meta_length;
FieldType _meta_type;
FieldType _meta_children_column_type;
bool _meta_is_nullable;
bool _use_index_page_cache;
int _be_exec_version = -1;

PagePointer _meta_dict_page;
CompressionTypePB _meta_compression;
Expand Down Expand Up @@ -276,8 +285,6 @@ class ColumnReader {

std::vector<std::unique_ptr<ColumnReader>> _sub_readers;

vectorized::DataTypePtr _agg_state_ptr;

DorisCallOnce<Status> _set_dict_encoding_type_once;
};

Expand Down
16 changes: 14 additions & 2 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ Status Segment::_open_inverted_index() {

Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_options,
std::unique_ptr<RowwiseIterator>* iter) {
if (read_options.runtime_state != nullptr) {
_be_exec_version = read_options.runtime_state->be_exec_version();
}
RETURN_IF_ERROR(_create_column_readers_once());

read_options.stats->total_segment_number++;
Expand Down Expand Up @@ -502,6 +505,7 @@ Status Segment::_create_column_readers(const SegmentFooterPB& footer) {

ColumnReaderOptions opts {
.kept_in_memory = _tablet_schema->is_in_memory(),
.be_exec_version = _be_exec_version,
};
std::unique_ptr<ColumnReader> reader;
RETURN_IF_ERROR(ColumnReader::create(opts, footer.columns(iter->second), footer.num_rows(),
Expand All @@ -522,8 +526,10 @@ Status Segment::_create_column_readers(const SegmentFooterPB& footer) {
continue;
}
const ColumnMetaPB& column_pb = footer.columns(iter->second);
ColumnReaderOptions opts;
opts.kept_in_memory = _tablet_schema->is_in_memory();
ColumnReaderOptions opts {
.kept_in_memory = _tablet_schema->is_in_memory(),
.be_exec_version = _be_exec_version,
};
std::unique_ptr<ColumnReader> reader;
RETURN_IF_ERROR(
ColumnReader::create(opts, column_pb, footer.num_rows(), _file_reader, &reader));
Expand Down Expand Up @@ -736,6 +742,9 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
Status Segment::new_column_iterator(const TabletColumn& tablet_column,
std::unique_ptr<ColumnIterator>* iter,
const StorageReadOptions* opt) {
if (opt->runtime_state != nullptr) {
_be_exec_version = opt->runtime_state->be_exec_version();
}
RETURN_IF_ERROR(_create_column_readers_once());

// init column iterator by path info
Expand Down Expand Up @@ -809,6 +818,9 @@ Status Segment::new_inverted_index_iterator(const TabletColumn& tablet_column,
const TabletIndex* index_meta,
const StorageReadOptions& read_options,
std::unique_ptr<InvertedIndexIterator>* iter) {
if (read_options.runtime_state != nullptr) {
_be_exec_version = read_options.runtime_state->be_exec_version();
}
RETURN_IF_ERROR(_create_column_readers_once());
ColumnReader* reader = _get_column_reader(tablet_column);
if (reader != nullptr && index_meta) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <unordered_map>
#include <vector>

#include "agent/be_exec_version_manager.h"
#include "common/status.h" // Status
#include "io/fs/file_reader_writer_fwd.h"
#include "io/fs/file_system.h"
Expand Down Expand Up @@ -285,6 +286,8 @@ class Segment : public std::enable_shared_from_this<Segment> {
DorisCallOnce<Status> _inverted_index_file_reader_open;

InvertedIndexFileInfo _idx_file_info;

int _be_exec_version = BeExecVersionManager::get_newest_version();
};

} // namespace segment_v2
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/data_types/data_type_agg_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ class DataTypeAggState : public DataTypeString {

DataTypePtr get_serialized_type() const { return _agg_serialized_type; }

void check_agg_state_compatibility(int read_be_exec_version) const {
BeExecVersionManager::check_agg_state_compatibility(read_be_exec_version, _be_exec_version,
get_nested_function()->get_name());
}

private:
std::string get_types_string() const {
std::string types;
Expand Down

0 comments on commit 0afe5a9

Please sign in to comment.