Skip to content

Commit

Permalink
[opt](load) check -238 segment number limit earlier (#39045)
Browse files Browse the repository at this point in the history
## Proposed changes

Previously we check segment number when build rowset because we don't
want throw an error in the intermediate state, hoping segment compaction
can reduce segment number later.

This PR adds a earlier segment number check, while doing segment
compaction.
Immediately throw an error if `_segcompacted_point` is greater than the
segment number limit.

---------

Co-authored-by: Xin Liao <[email protected]>
  • Loading branch information
kaijchen and liaoxin01 authored Sep 23, 2024
1 parent e993b0f commit bb7c064
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 22 deletions.
38 changes: 19 additions & 19 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,26 +515,27 @@ Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, u
return Status::OK();
}

// return true if there isn't any flying segcompaction, otherwise return false
bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() {
return !_is_doing_segcompaction.exchange(true);
}

Status BetaRowsetWriter::_segcompaction_if_necessary() {
Status status = Status::OK();
// leave _check_and_set_is_doing_segcompaction as the last condition
// otherwise _segcompacting_cond will never get notified
// if not doing segcompaction, just check segment number
if (!config::enable_segcompaction || !_context.enable_segcompaction ||
!_context.tablet_schema->cluster_key_idxes().empty() ||
_context.tablet_schema->num_variant_columns() > 0 ||
!_check_and_set_is_doing_segcompaction()) {
_context.tablet_schema->num_variant_columns() > 0) {
return _check_segment_number_limit(_num_segment);
}
// leave _is_doing_segcompaction as the last condition
// otherwise _segcompacting_cond will never get notified
if (_is_doing_segcompaction.exchange(true)) {
return status;
}
if (_segcompaction_status.load() != OK) {
status = Status::Error<SEGCOMPACTION_FAILED>(
"BetaRowsetWriter::_segcompaction_if_necessary meet invalid state, error code: {}",
_segcompaction_status.load());
} else if ((_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) {
} else {
status = _check_segment_number_limit(_num_segcompacted);
}
if (status.ok() && (_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) {
SegCompactionCandidatesSharedPtr segments;
status = _find_longest_consecutive_small_segment(segments);
if (LIKELY(status.ok()) && (!segments->empty())) {
Expand Down Expand Up @@ -720,7 +721,8 @@ Status BetaRowsetWriter::_close_file_writers() {
Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
RETURN_IF_ERROR(_close_file_writers());

RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(),
const auto total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted;
RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(total_segment_num),
"too many segments when build new rowset");
RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta.get(), true));
if (_is_pending) {
Expand Down Expand Up @@ -909,11 +911,10 @@ Status BetaRowsetWriter::_create_segment_writer_for_segcompaction(
return Status::OK();
}

Status BaseBetaRowsetWriter::_check_segment_number_limit() {
size_t total_segment_num = _num_segment + 1;
Status BaseBetaRowsetWriter::_check_segment_number_limit(size_t segnum) {
DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
{ total_segment_num = dp->param("segnum", 1024); });
if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
{ segnum = dp->param("segnum", 1024); });
if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) {
return Status::Error<TOO_MANY_SEGMENTS>(
"too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, "
"_num_segment:{}, rowset_num_rows:{}",
Expand All @@ -923,11 +924,10 @@ Status BaseBetaRowsetWriter::_check_segment_number_limit() {
return Status::OK();
}

Status BetaRowsetWriter::_check_segment_number_limit() {
size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted;
Status BetaRowsetWriter::_check_segment_number_limit(size_t segnum) {
DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
{ total_segment_num = dp->param("segnum", 1024); });
if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
{ segnum = dp->param("segnum", 1024); });
if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) {
return Status::Error<TOO_MANY_SEGMENTS>(
"too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, "
"_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}",
Expand Down
5 changes: 2 additions & 3 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class BaseBetaRowsetWriter : public RowsetWriter {
Status _build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num = false);
Status _create_file_writer(const std::string& path, io::FileWriterPtr& file_writer);
virtual Status _close_file_writers();
virtual Status _check_segment_number_limit();
virtual Status _check_segment_number_limit(size_t segnum);
virtual int64_t _num_seg() const;
// build a tmp rowset for load segment to calc delete_bitmap for this segment
Status _build_tmp(RowsetSharedPtr& rowset_ptr);
Expand Down Expand Up @@ -298,7 +298,7 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter {
// segment compaction
friend class SegcompactionWorker;
Status _close_file_writers() override;
Status _check_segment_number_limit() override;
Status _check_segment_number_limit(size_t segnum) override;
int64_t _num_seg() const override;
Status _wait_flying_segcompaction();
Status _create_segment_writer_for_segcompaction(
Expand All @@ -307,7 +307,6 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter {
Status _segcompaction_rename_last_segments();
Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, int32_t segment_id);
Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr& segments);
bool _check_and_set_is_doing_segcompaction();
Status _rename_compacted_segments(int64_t begin, int64_t end);
Status _rename_compacted_segment_plain(uint64_t seg_id);
Status _rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id);
Expand Down

0 comments on commit bb7c064

Please sign in to comment.