Skip to content

Commit

Permalink
Merge branch 'master' into sleep_change_part6
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp authored Aug 29, 2024
2 parents e7cb900 + 63b8949 commit 7530c63
Show file tree
Hide file tree
Showing 127 changed files with 2,286 additions and 781 deletions.
3 changes: 0 additions & 3 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ github:
- Clang Formatter
- CheckStyle
- P0 Regression (Doris Regression)
- P1 Regression (Doris Regression)
- External Regression (Doris External Regression)
- cloud_p1 (Doris Cloud Regression)
- cloud_p0 (Doris Cloud Regression)
- FE UT (Doris FE UT)
- BE UT (Doris BE UT)
Expand Down Expand Up @@ -114,7 +112,6 @@ github:
- Clang Formatter
- CheckStyle
- P0 Regression (Doris Regression)
- P1 Regression (Doris Regression)
- External Regression (Doris External Regression)
- FE UT (Doris FE UT)
- BE UT (Doris BE UT)
Expand Down
15 changes: 12 additions & 3 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -460,18 +460,27 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t

vectorized::RowSourcesBuffer row_sources_buf(
tablet->tablet_id(), dst_rowset_writer->context().tablet_path, reader_type);
tablet->sample_infos.resize(column_groups.size(), {0, 0, 0});
{
std::unique_lock<std::mutex> lock(tablet->sample_info_lock);
tablet->sample_infos.resize(column_groups.size(), {0, 0, 0});
}
// compact group one by one
for (auto i = 0; i < column_groups.size(); ++i) {
VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
bool is_key = (i == 0);
int64_t batch_size = config::compaction_batch_size != -1
? config::compaction_batch_size
: estimate_batch_size(i, tablet, merge_way_num);
RETURN_IF_ERROR(vertical_compact_one_group(
CompactionSampleInfo sample_info;
Status st = vertical_compact_one_group(
tablet, reader_type, tablet_schema, is_key, column_groups[i], &row_sources_buf,
src_rowset_readers, dst_rowset_writer, max_rows_per_segment, stats_output,
key_group_cluster_key_idxes, batch_size, &(tablet->sample_infos[i])));
key_group_cluster_key_idxes, batch_size, &sample_info);
{
std::unique_lock<std::mutex> lock(tablet->sample_info_lock);
tablet->sample_infos[i] = sample_info;
}
RETURN_IF_ERROR(st);
if (is_key) {
RETURN_IF_ERROR(row_sources_buf.flush());
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ Status BetaRowset::get_inverted_index_size(size_t* index_size) {
if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) {
auto indices = _schema->indexes();
for (auto& index : indices) {
// only get file_size for inverted index
if (index.index_type() != IndexType::INVERTED) {
continue;
}
for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
auto seg_path = DORIS_TRY(segment_path(seg_id));
int64_t file_size = 0;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
return Status::OK();
}

Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) {
Status SegmentFlusher::_internal_parse_variant_columns(vectorized::Block& block) {
size_t num_rows = block.rows();
if (num_rows == 0) {
return Status::OK();
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/rowset/segment_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ class SegmentFlusher {
bool need_buffering();

private:
Status _parse_variant_columns(vectorized::Block& block);
// This method will catch exception when allocate memory failed
Status _parse_variant_columns(vectorized::Block& block) {
RETURN_IF_CATCH_EXCEPTION({ return _internal_parse_variant_columns(block); });
}
Status _internal_parse_variant_columns(vectorized::Block& block);
Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
const vectorized::Block* block, size_t row_offset, size_t row_num);
Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/page_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ Status PageIO::write_page(io::FileWriter* writer, const std::vector<Slice>& body
return Status::OK();
}

Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer) {
Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer) {
opts.sanity_check();
opts.stats->total_pages_num++;

Expand Down
11 changes: 10 additions & 1 deletion be/src/olap/rowset/segment_v2/page_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,17 @@ class PageIO {
// `handle' holds the memory of page data,
// `body' points to page body,
// `footer' stores the page footer.
// This method is exception safe, it will failed when allocate memory failed.
static Status read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer);
Slice* body, PageFooterPB* footer) {
RETURN_IF_CATCH_EXCEPTION(
{ return read_and_decompress_page_(opts, handle, body, footer); });
}

private:
// An internal method that not deal with exception.
static Status read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer);
};

} // namespace segment_v2
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_o
ColumnReader* reader = nullptr;
if (col.is_extracted_column()) {
auto relative_path = col.path_info_ptr()->copy_pop_front();
const auto* node = _sub_column_tree[col.unique_id()].find_exact(relative_path);
int32_t unique_id = col.unique_id() > 0 ? col.unique_id() : col.parent_unique_id();
const auto* node = _sub_column_tree[unique_id].find_exact(relative_path);
reader = node != nullptr ? node->data.reader.get() : nullptr;
} else {
reader = _column_readers.contains(col.unique_id())
Expand Down Expand Up @@ -775,8 +776,9 @@ ColumnReader* Segment::_get_column_reader(const TabletColumn& col) {
// init column iterator by path info
if (col.has_path_info() || col.is_variant_type()) {
auto relative_path = col.path_info_ptr()->copy_pop_front();
int32_t unique_id = col.unique_id() > 0 ? col.unique_id() : col.parent_unique_id();
const auto* node = col.has_path_info()
? _sub_column_tree[col.unique_id()].find_exact(relative_path)
? _sub_column_tree[unique_id].find_exact(relative_path)
: nullptr;
if (node != nullptr) {
return node->data.reader.get();
Expand Down
7 changes: 6 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1488,10 +1488,15 @@ Status SegmentIterator::_init_inverted_index_iterators() {
}
for (auto cid : _schema->column_ids()) {
if (_inverted_index_iterators[cid] == nullptr) {
// Not check type valid, since we need to get inverted index for related variant type when reading the segment.
// If check type valid, we can not get inverted index for variant type, and result nullptr.The result for calling
// get_inverted_index with variant suffix should return corresponding inverted index meta.
bool check_inverted_index_by_type = false;
// Use segment’s own index_meta, for compatibility with future indexing needs to default to lowercase.
RETURN_IF_ERROR(_segment->new_inverted_index_iterator(
_opts.tablet_schema->column(cid),
_segment->_tablet_schema->get_inverted_index(_opts.tablet_schema->column(cid)),
_segment->_tablet_schema->get_inverted_index(_opts.tablet_schema->column(cid),
check_inverted_index_by_type),
_opts, &_inverted_index_iterators[cid]));
}
}
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,7 @@ SchemaChangeJob::SchemaChangeJob(StorageEngine& local_storage_engine,
// The admin should upgrade all BE and then upgrade FE.
// Should delete the old code after upgrade finished.
Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& request) {
DBUG_EXECUTE_IF("SchemaChangeJob._do_process_alter_tablet.sleep", { sleep(10); })
Status res;
signal::tablet_id = _base_tablet->get_table_id();

Expand Down
3 changes: 2 additions & 1 deletion be/src/util/faststring.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ class faststring : private Allocator<false, false, false, DefaultMemoryAllocator
OwnedSlice build() {
uint8_t* ret = data_;
if (ret == initial_data_) {
ret = reinterpret_cast<uint8_t*>(Allocator::alloc(len_));
ret = reinterpret_cast<uint8_t*>(Allocator::alloc(capacity_));
DCHECK(len_ <= capacity_);
memcpy(ret, data_, len_);
}
OwnedSlice result(ret, len_, capacity_);
Expand Down
7 changes: 6 additions & 1 deletion be/src/util/slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,12 @@ class OwnedSlice : private Allocator<false, false, false, DefaultMemoryAllocator
OwnedSlice(const OwnedSlice&) = delete;
void operator=(const OwnedSlice&) = delete;

~OwnedSlice() { Allocator::free(_slice.data, _capacity); }
~OwnedSlice() {
if (_slice.data != nullptr) {
DCHECK(_capacity != 0);
Allocator::free(_slice.data, _capacity);
}
}

const Slice& slice() const { return _slice; }

Expand Down
14 changes: 4 additions & 10 deletions be/src/vec/common/schema_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,16 +530,10 @@ Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos,

Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
const ParseConfig& config) {
try {
// Parse each variant column from raw string column
RETURN_IF_ERROR(
vectorized::schema_util::_parse_variant_columns(block, variant_pos, config));
} catch (const doris::Exception& e) {
// TODO more graceful, max_filter_ratio
LOG(WARNING) << "encounter execption " << e.to_string();
return Status::InternalError(e.to_string());
}
return Status::OK();
// Parse each variant column from raw string column
RETURN_IF_CATCH_EXCEPTION({
return vectorized::schema_util::_parse_variant_columns(block, variant_pos, config);
});
}

Status encode_variant_sparse_subcolumns(ColumnObject& column) {
Expand Down
49 changes: 18 additions & 31 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2054,7 +2054,6 @@ Status OrcReader::on_string_dicts_loaded(
orc::StringDictionary* dict = file_column_name_to_dict_map_iter->second;

std::vector<StringRef> dict_values;
std::unordered_map<StringRef, int64_t> dict_value_to_code;
size_t max_value_length = 0;
uint64_t dictionaryCount = dict->dictionaryOffset.size() - 1;
if (dictionaryCount == 0) {
Expand All @@ -2074,7 +2073,6 @@ Status OrcReader::on_string_dicts_loaded(
max_value_length = length;
}
dict_values.emplace_back(dict_value);
dict_value_to_code[dict_value] = i;
}
dict_value_column->insert_many_strings_overflow(&dict_values[0], dict_values.size(),
max_value_length);
Expand Down Expand Up @@ -2113,58 +2111,47 @@ Status OrcReader::on_string_dicts_loaded(
++index;
}

// 2.2 Execute conjuncts and filter block.
std::vector<uint32_t> columns_to_filter(1, dict_pos);
int column_to_keep = temp_block.columns();
// 2.2 Execute conjuncts.
if (dict_pos != 0) {
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
// The following process may be tricky and time-consuming, but we have no other way.
temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
}
RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
ctxs, &temp_block, columns_to_filter, column_to_keep)));
IColumn::Filter result_filter(temp_block.rows(), 1);
bool can_filter_all;
RETURN_IF_ERROR(VExprContext::execute_conjuncts(ctxs, nullptr, &temp_block, &result_filter,
&can_filter_all));
if (dict_pos != 0) {
// We have to clean the first column to insert right data.
temp_block.get_by_position(0).column->assume_mutable()->clear();
}

// Check some conditions.
ColumnPtr& dict_column = temp_block.get_by_position(dict_pos).column;
// If dict_column->size() == 0, can filter this stripe.
if (dict_column->size() == 0) {
// If can_filter_all = true, can filter this stripe.
if (can_filter_all) {
*is_stripe_filtered = true;
return Status::OK();
}

// 3. Get dict codes.
std::vector<int32_t> dict_codes;
for (size_t i = 0; i < result_filter.size(); ++i) {
if (result_filter[i]) {
dict_codes.emplace_back(i);
}
}

// About Performance: if dict_column size is too large, it will generate a large IN filter.
if (dict_column->size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
if (dict_codes.size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
it = _dict_filter_cols.erase(it);
for (auto& ctx : ctxs) {
_non_dict_filter_conjuncts.emplace_back(ctx);
}
continue;
}

// 3. Get dict codes.
std::vector<int32_t> dict_codes;
if (dict_column->is_nullable()) {
const ColumnNullable* nullable_column =
static_cast<const ColumnNullable*>(dict_column.get());
const ColumnString* nested_column = static_cast<const ColumnString*>(
nullable_column->get_nested_column_ptr().get());
for (int i = 0; i < nested_column->size(); ++i) {
StringRef dict_value = nested_column->get_data_at(i);
dict_codes.emplace_back(dict_value_to_code[dict_value]);
}
} else {
for (int i = 0; i < dict_column->size(); ++i) {
StringRef dict_value = dict_column->get_data_at(i);
dict_codes.emplace_back(dict_value_to_code[dict_value]);
}
}

// 4. Rewrite conjuncts.
RETURN_IF_ERROR(_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable()));
RETURN_IF_ERROR(_rewrite_dict_conjuncts(
dict_codes, slot_id, temp_block.get_by_position(dict_pos).column->is_nullable()));
++it;
}
return Status::OK();
Expand Down
11 changes: 0 additions & 11 deletions be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ Status ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t
total_length += l;
}

_dict_value_to_code.reserve(num_values);
// For insert_many_strings_overflow
_dict_data.resize(total_length + ColumnString::MAX_STRINGS_OVERFLOW_SIZE);
_max_value_length = 0;
Expand All @@ -55,7 +54,6 @@ Status ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t
offset_cursor += 4;
memcpy(&_dict_data[offset], dict_item_address + offset_cursor, l);
_dict_items.emplace_back(&_dict_data[offset], l);
_dict_value_to_code[StringRef(&_dict_data[offset], l)] = i;
offset_cursor += l;
offset += l;
if (offset_cursor > length) {
Expand All @@ -77,15 +75,6 @@ Status ByteArrayDictDecoder::read_dict_values_to_column(MutableColumnPtr& doris_
return Status::OK();
}

Status ByteArrayDictDecoder::get_dict_codes(const ColumnString* string_column,
std::vector<int32_t>* dict_codes) {
for (int i = 0; i < string_column->size(); ++i) {
StringRef dict_value = string_column->get_data_at(i);
dict_codes->emplace_back(_dict_value_to_code[dict_value]);
}
return Status::OK();
}

MutableColumnPtr ByteArrayDictDecoder::convert_dict_column_to_string_column(
const ColumnInt32* dict_column) {
auto res = ColumnString::create();
Expand Down
4 changes: 0 additions & 4 deletions be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,12 @@ class ByteArrayDictDecoder final : public BaseDictDecoder {

Status read_dict_values_to_column(MutableColumnPtr& doris_column) override;

Status get_dict_codes(const ColumnString* column_string,
std::vector<int32_t>* dict_codes) override;

MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) override;

protected:
// For dictionary encoding
std::vector<StringRef> _dict_items;
std::vector<uint8_t> _dict_data;
size_t _max_value_length;
std::unordered_map<StringRef, int32_t> _dict_value_to_code;
};
} // namespace doris::vectorized
5 changes: 0 additions & 5 deletions be/src/vec/exec/format/parquet/decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ class Decoder {
return Status::NotSupported("read_dict_values_to_column is not supported");
}

virtual Status get_dict_codes(const ColumnString* column_string,
std::vector<int32_t>* dict_codes) {
return Status::NotSupported("get_dict_codes is not supported");
}

virtual MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) {
LOG(FATAL) << "Method convert_dict_column_to_string_column is not supported";
__builtin_unreachable();
Expand Down
14 changes: 0 additions & 14 deletions be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,8 @@ class FixLengthDictDecoder final : public BaseDictDecoder {
_dict = std::move(dict);
char* dict_item_address = reinterpret_cast<char*>(_dict.get());
_dict_items.resize(num_values);
_dict_value_to_code.reserve(num_values);
for (size_t i = 0; i < num_values; ++i) {
_dict_items[i] = dict_item_address;
_dict_value_to_code[StringRef(_dict_items[i], _type_length)] = i;
dict_item_address += _type_length;
}
return Status::OK();
Expand All @@ -128,17 +126,6 @@ class FixLengthDictDecoder final : public BaseDictDecoder {
return Status::OK();
}

Status get_dict_codes(const ColumnString* string_column,
std::vector<int32_t>* dict_codes) override {
size_t size = string_column->size();
dict_codes->reserve(size);
for (int i = 0; i < size; ++i) {
StringRef dict_value = string_column->get_data_at(i);
dict_codes->emplace_back(_dict_value_to_code[dict_value]);
}
return Status::OK();
}

MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) override {
auto res = ColumnString::create();
std::vector<StringRef> dict_values(dict_column->size());
Expand All @@ -149,7 +136,6 @@ class FixLengthDictDecoder final : public BaseDictDecoder {
res->insert_many_strings(&dict_values[0], dict_values.size());
return res;
}
std::unordered_map<StringRef, int32_t> _dict_value_to_code;
// For dictionary encoding
std::vector<char*> _dict_items;
};
Expand Down
Loading

0 comments on commit 7530c63

Please sign in to comment.