Skip to content

Commit

Permalink
[cherrypick](branch21) pick some exception handleing pr in page io (#…
Browse files Browse the repository at this point in the history
…40883)

## Proposed changes

pick
#39536
#39913
#39964
#40020

---------

Co-authored-by: yiguolei <[email protected]>
  • Loading branch information
yiguolei and Doris-Extras authored Sep 16, 2024
1 parent 782973e commit a6e2364
Show file tree
Hide file tree
Showing 24 changed files with 97 additions and 89 deletions.
7 changes: 6 additions & 1 deletion be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ bool MemTable::need_agg() const {
return false;
}

Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) {
Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
size_t same_keys_num = _sort();
if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
if (_keys_type == KeysType::DUP_KEYS && _tablet_schema->num_key_columns() == 0) {
Expand All @@ -529,4 +529,9 @@ Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) {
return Status::OK();
}

Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_to_block(res));
return Status::OK();
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ class MemTable {
void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row,
RowInBlock* row_in_skiplist);

// Used to wrapped by to_block to do exception handle logic
Status _to_block(std::unique_ptr<vectorized::Block>* res);

private:
int64_t _tablet_id;
bool _enable_unique_key_mow = false;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
return Status::OK();
}

Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) {
Status SegmentFlusher::_internal_parse_variant_columns(vectorized::Block& block) {
size_t num_rows = block.rows();
if (num_rows == 0) {
return Status::OK();
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/rowset/segment_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ class SegmentFlusher {
bool need_buffering();

private:
Status _parse_variant_columns(vectorized::Block& block);
// This method will catch exception when allocate memory failed
Status _parse_variant_columns(vectorized::Block& block) {
RETURN_IF_CATCH_EXCEPTION({ return _internal_parse_variant_columns(block); });
}
Status _internal_parse_variant_columns(vectorized::Block& block);
Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
const vectorized::Block* block, size_t row_offset, size_t row_num);
Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,
Expand Down
14 changes: 8 additions & 6 deletions be/src/olap/rowset/segment_v2/binary_dict_page.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,22 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) {
}
}

OwnedSlice BinaryDictPageBuilder::finish() {
Status BinaryDictPageBuilder::finish(OwnedSlice* slice) {
if (VLOG_DEBUG_IS_ON && _encoding_type == DICT_ENCODING) {
VLOG_DEBUG << "dict page size:" << _dict_builder->size();
}

DCHECK(!_finished);
_finished = true;

OwnedSlice data_slice = _data_page_builder->finish();
OwnedSlice data_slice;
RETURN_IF_ERROR(_data_page_builder->finish(&data_slice));
// TODO(gaodayue) separate page header and content to avoid this copy
_buffer.append(data_slice.slice().data, data_slice.slice().size);
RETURN_IF_CATCH_EXCEPTION(
{ _buffer.append(data_slice.slice().data, data_slice.slice().size); });
encode_fixed32_le(&_buffer[0], _encoding_type);
return _buffer.build();
*slice = _buffer.build();
return Status::OK();
}

Status BinaryDictPageBuilder::reset() {
Expand Down Expand Up @@ -183,8 +186,7 @@ uint64_t BinaryDictPageBuilder::size() const {
}

Status BinaryDictPageBuilder::get_dictionary_page(OwnedSlice* dictionary_page) {
*dictionary_page = _dict_builder->finish();
return Status::OK();
return _dict_builder->finish(dictionary_page);
}

Status BinaryDictPageBuilder::get_first_value(void* value) const {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/binary_dict_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class BinaryDictPageBuilder : public PageBuilderHelper<BinaryDictPageBuilder> {

Status add(const uint8_t* vals, size_t* count) override;

OwnedSlice finish() override;
Status finish(OwnedSlice* slice) override;

Status reset() override;

Expand Down
25 changes: 14 additions & 11 deletions be/src/olap/rowset/segment_v2/binary_plain_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,22 @@ class BinaryPlainPageBuilder : public PageBuilderHelper<BinaryPlainPageBuilder<T
return Status::OK();
}

OwnedSlice finish() override {
Status finish(OwnedSlice* slice) override {
DCHECK(!_finished);
_finished = true;
// Set up trailer
for (uint32_t _offset : _offsets) {
put_fixed32_le(&_buffer, _offset);
}
put_fixed32_le(&_buffer, _offsets.size());
if (_offsets.size() > 0) {
_copy_value_at(0, &_first_value);
_copy_value_at(_offsets.size() - 1, &_last_value);
}
return _buffer.build();
RETURN_IF_CATCH_EXCEPTION({
// Set up trailer
for (uint32_t _offset : _offsets) {
put_fixed32_le(&_buffer, _offset);
}
put_fixed32_le(&_buffer, _offsets.size());
if (_offsets.size() > 0) {
_copy_value_at(0, &_first_value);
_copy_value_at(_offsets.size() - 1, &_last_value);
}
*slice = _buffer.build();
});
return Status::OK();
}

Status reset() override {
Expand Down
23 changes: 13 additions & 10 deletions be/src/olap/rowset/segment_v2/binary_prefix_page.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,21 @@ Status BinaryPrefixPageBuilder::add(const uint8_t* vals, size_t* add_count) {
return Status::OK();
}

OwnedSlice BinaryPrefixPageBuilder::finish() {
Status BinaryPrefixPageBuilder::finish(OwnedSlice* slice) {
DCHECK(!_finished);
_finished = true;
put_fixed32_le(&_buffer, (uint32_t)_count);
uint8_t restart_point_internal = RESTART_POINT_INTERVAL;
_buffer.append(&restart_point_internal, 1);
auto restart_point_size = _restart_points_offset.size();
for (uint32_t i = 0; i < restart_point_size; ++i) {
put_fixed32_le(&_buffer, _restart_points_offset[i]);
}
put_fixed32_le(&_buffer, restart_point_size);
return _buffer.build();
RETURN_IF_CATCH_EXCEPTION({
put_fixed32_le(&_buffer, (uint32_t)_count);
uint8_t restart_point_internal = RESTART_POINT_INTERVAL;
_buffer.append(&restart_point_internal, 1);
auto restart_point_size = _restart_points_offset.size();
for (uint32_t i = 0; i < restart_point_size; ++i) {
put_fixed32_le(&_buffer, _restart_points_offset[i]);
}
put_fixed32_le(&_buffer, restart_point_size);
*slice = _buffer.build();
});
return Status::OK();
}

const uint8_t* BinaryPrefixPageDecoder::_decode_value_lengths(const uint8_t* ptr, uint32_t* shared,
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/binary_prefix_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class BinaryPrefixPageBuilder : public PageBuilderHelper<BinaryPrefixPageBuilder

Status add(const uint8_t* vals, size_t* add_count) override;

OwnedSlice finish() override;
Status finish(OwnedSlice* slice) override;

Status reset() override {
_restart_points_offset.clear();
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/bitshuffle_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,13 @@ class BitshufflePageBuilder : public PageBuilderHelper<BitshufflePageBuilder<Typ
return Status::OK();
}

OwnedSlice finish() override {
Status finish(OwnedSlice* slice) override {
if (_count > 0) {
_first_value = cell(0);
_last_value = cell(_count - 1);
}
return _finish(SIZE_OF_TYPE);
RETURN_IF_CATCH_EXCEPTION({ *slice = _finish(SIZE_OF_TYPE); });
return Status::OK();
}

Status reset() override {
Expand Down
10 changes: 6 additions & 4 deletions be/src/olap/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ class NullBitmapBuilder {
// Returns whether the building nullmap contains nullptr
bool has_null() const { return _has_null; }

OwnedSlice finish() {
Status finish(OwnedSlice* slice) {
_rle_encoder.Flush();
return _bitmap_buf.build();
RETURN_IF_CATCH_EXCEPTION({ *slice = _bitmap_buf.build(); });
return Status::OK();
}

void reset() {
Expand Down Expand Up @@ -723,14 +724,15 @@ Status ScalarColumnWriter::finish_current_page() {

// build data page body : encoded values + [nullmap]
std::vector<Slice> body;
OwnedSlice encoded_values = _page_builder->finish();
OwnedSlice encoded_values;
RETURN_IF_ERROR(_page_builder->finish(&encoded_values));
RETURN_IF_ERROR(_page_builder->reset());
body.push_back(encoded_values.slice());

OwnedSlice nullmap;
if (_null_bitmap_builder != nullptr) {
if (is_nullable() && _null_bitmap_builder->has_null()) {
nullmap = _null_bitmap_builder->finish();
RETURN_IF_ERROR(_null_bitmap_builder->finish(&nullmap));
body.push_back(nullmap.slice());
}
_null_bitmap_builder->reset();
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/frame_of_reference_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ class FrameOfReferencePageBuilder : public PageBuilderHelper<FrameOfReferencePag
return Status::OK();
}

OwnedSlice finish() override {
Status finish(OwnedSlice* slice) override {
DCHECK(!_finished);
_finished = true;
_encoder->flush();
return _buf.build();
RETURN_IF_CATCH_EXCEPTION({ *slice = _buf.build(); });
return Status::OK();
}

Status reset() override {
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ Status IndexedColumnWriter::_finish_current_data_page(size_t& num_val) {
ordinal_t first_ordinal = _num_values - num_values_in_page;

// IndexedColumn doesn't have NULLs, thus data page body only contains encoded values
OwnedSlice page_body = _data_page_builder->finish();
OwnedSlice page_body;
RETURN_IF_ERROR(_data_page_builder->finish(&page_body));
RETURN_IF_ERROR(_data_page_builder->reset());

PageFooterPB footer;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/page_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class PageBuilder {

// Finish building the current page, return the encoded data.
// This api should be followed by reset() before reusing the builder
virtual OwnedSlice finish() = 0;
// It will return error status when memory allocated failed during finish
virtual Status finish(OwnedSlice* owned_slice) = 0;

// Get the dictionary page for dictionary encoding mode column.
virtual Status get_dictionary_page(OwnedSlice* dictionary_page) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/page_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ Status PageIO::write_page(io::FileWriter* writer, const std::vector<Slice>& body
return Status::OK();
}

Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer) {
Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer) {
opts.sanity_check();
opts.stats->total_pages_num++;

Expand Down
11 changes: 10 additions & 1 deletion be/src/olap/rowset/segment_v2/page_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,17 @@ class PageIO {
// `handle' holds the memory of page data,
// `body' points to page body,
// `footer' stores the page footer.
// This method is exception safe, it will failed when allocate memory failed.
static Status read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer);
Slice* body, PageFooterPB* footer) {
RETURN_IF_CATCH_EXCEPTION(
{ return read_and_decompress_page_(opts, handle, body, footer); });
}

private:
// An internal method that not deal with exception.
static Status read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer);
};

} // namespace segment_v2
Expand Down
18 changes: 11 additions & 7 deletions be/src/olap/rowset/segment_v2/plain_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,18 @@ class PlainPageBuilder : public PageBuilderHelper<PlainPageBuilder<Type> > {
return Status::OK();
}

OwnedSlice finish() override {
Status finish(OwnedSlice* slice) override {
encode_fixed32_le((uint8_t*)&_buffer[0], _count);
if (_count > 0) {
_first_value.assign_copy(&_buffer[PLAIN_PAGE_HEADER_SIZE], SIZE_OF_TYPE);
_last_value.assign_copy(&_buffer[PLAIN_PAGE_HEADER_SIZE + (_count - 1) * SIZE_OF_TYPE],
SIZE_OF_TYPE);
}
return _buffer.build();
RETURN_IF_CATCH_EXCEPTION({
if (_count > 0) {
_first_value.assign_copy(&_buffer[PLAIN_PAGE_HEADER_SIZE], SIZE_OF_TYPE);
_last_value.assign_copy(
&_buffer[PLAIN_PAGE_HEADER_SIZE + (_count - 1) * SIZE_OF_TYPE],
SIZE_OF_TYPE);
}
*slice = _buffer.build();
});
return Status::OK();
}

Status reset() override {
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/rle_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,15 @@ class RlePageBuilder : public PageBuilderHelper<RlePageBuilder<Type> > {
return Status::OK();
}

OwnedSlice finish() override {
Status finish(OwnedSlice* slice) override {
DCHECK(!_finished);
_finished = true;
// here should Flush first and then encode the count header
// or it will lead to a bug if the header is less than 8 byte and the data is small
_rle_encoder->Flush();
encode_fixed32_le(&_buf[0], _count);
return _buf.build();
*slice = _buf.build();
return Status::OK();
}

Status reset() override {
Expand Down
9 changes: 0 additions & 9 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,15 +463,6 @@ Status RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
return Status::OK();
}

int64_t RuntimeState::get_load_mem_limit() {
// TODO: the code is abandoned, it can be deleted after v1.3
if (_query_options.__isset.load_mem_limit && _query_options.load_mem_limit > 0) {
return _query_options.load_mem_limit;
} else {
return _query_mem_tracker->limit();
}
}

void RuntimeState::resize_op_id_to_local_state(int operator_size) {
_op_id_to_local_state.resize(-operator_size);
}
Expand Down
4 changes: 0 additions & 4 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,6 @@ class RuntimeState {

std::vector<TErrorTabletInfo>& error_tablet_infos() { return _error_tablet_infos; }

// get mem limit for load channel
// if load mem limit is not set, or is zero, using query mem limit instead.
int64_t get_load_mem_limit();

// local runtime filter mgr, the runtime filter do not have remote target or
// not need local merge should regist here. the instance exec finish, the local
// runtime filter mgr can release the memory of local runtime filter
Expand Down
12 changes: 3 additions & 9 deletions be/src/vec/common/schema_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,15 +545,9 @@ Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos,

Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
const ParseContext& ctx) {
try {
// Parse each variant column from raw string column
RETURN_IF_ERROR(vectorized::schema_util::_parse_variant_columns(block, variant_pos, ctx));
} catch (const doris::Exception& e) {
// TODO more graceful, max_filter_ratio
LOG(WARNING) << "encounter execption " << e.to_string();
return Status::InternalError(e.to_string());
}
return Status::OK();
// Parse each variant column from raw string column
RETURN_IF_CATCH_EXCEPTION(
{ return vectorized::schema_util::_parse_variant_columns(block, variant_pos, ctx); });
}

void finalize_variant_columns(Block& block, const std::vector<int>& variant_pos,
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ void VNodeChannel::_open_internal(bool is_incremental) {

request->set_num_senders(_parent->_num_senders);
request->set_need_gen_rollup(false); // Useless but it is a required field in pb
request->set_load_mem_limit(_parent->_load_mem_limit);
request->set_load_channel_timeout_s(_parent->_load_channel_timeout_s);
request->set_is_high_priority(_parent->_is_high_priority);
request->set_sender_ip(BackendOptions::get_localhost());
Expand Down Expand Up @@ -1245,7 +1244,6 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
_max_wait_exec_timer = ADD_TIMER(profile, "MaxWaitExecTime");
_add_batch_number = ADD_COUNTER(profile, "NumberBatchAdded", TUnit::UNIT);
_num_node_channels = ADD_COUNTER(profile, "NumberNodeChannels", TUnit::UNIT);
_load_mem_limit = state->get_load_mem_limit();

#ifdef DEBUG
// check: tablet ids should be unique
Expand Down
3 changes: 0 additions & 3 deletions be/src/vec/sink/writer/vtablet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -660,9 +660,6 @@ class VTabletWriter final : public AsyncResultWriter {
RuntimeProfile::Counter* _add_batch_number = nullptr;
RuntimeProfile::Counter* _num_node_channels = nullptr;

// load mem limit is for remote load channel
int64_t _load_mem_limit = -1;

// the timeout of load channels opened by this tablet sink. in second
int64_t _load_channel_timeout_s = 0;

Expand Down
Loading

0 comments on commit a6e2364

Please sign in to comment.