Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Sep 19, 2024
1 parent 07a5362 commit faee48f
Show file tree
Hide file tree
Showing 21 changed files with 109 additions and 75 deletions.
23 changes: 15 additions & 8 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,18 @@ Status CloudBaseCompaction::prepare_compact() {
for (auto& rs : _input_rowsets) {
_input_row_num += rs->num_rows();
_input_segments += rs->num_segments();
_input_rowsets_size += rs->data_disk_size();
_input_rowsets_data_size += rs->data_disk_size();
_input_rowsets_total_size += rs->total_disk_size();
}
LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_data_size", _input_rowsets_size);
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size);
return st;
}

Expand Down Expand Up @@ -270,17 +273,21 @@ Status CloudBaseCompaction::execute_compact() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_data_size", _input_rowsets_size)
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total", _input_rowsets_total_size)
.tag("output_rows", _output_rowset->num_rows())
.tag("output_segments", _output_rowset->num_segments())
.tag("output_data_size", _output_rowset->data_disk_size());
.tag("output_rowset_data_size", _output_rowset->data_disk_size())
.tag("output_rowset_index_size", _output_rowset->index_disk_size())
.tag("output_rowset_total_size", _output_rowset->total_disk_size());

//_compaction_succeed = true;
_state = CompactionState::SUCCESS;

DorisMetrics::instance()->base_compaction_deltas_total->increment(_input_rowsets.size());
DorisMetrics::instance()->base_compaction_bytes_total->increment(_input_rowsets_size);
base_output_size << _output_rowset->data_disk_size();
DorisMetrics::instance()->base_compaction_bytes_total->increment(_input_rowsets_total_size);
base_output_size << _output_rowset->total_disk_size();

return Status::OK();
}
Expand All @@ -302,8 +309,8 @@ Status CloudBaseCompaction::modify_rowsets() {
compaction_job->set_output_cumulative_point(cloud_tablet()->cumulative_layer_point());
compaction_job->set_num_input_rows(_input_row_num);
compaction_job->set_num_output_rows(_output_rowset->num_rows());
compaction_job->set_size_input_rowsets(_input_rowsets_size);
compaction_job->set_size_output_rowsets(_output_rowset->data_disk_size());
compaction_job->set_size_input_rowsets(_input_rowsets_total_size);
compaction_job->set_size_output_rowsets(_output_rowset->total_disk_size());
compaction_job->set_num_input_segments(_input_segments);
compaction_job->set_num_output_segments(_output_rowset->num_segments());
compaction_job->set_num_input_rowsets(_input_rowsets.size());
Expand Down
24 changes: 16 additions & 8 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,19 @@ Status CloudCumulativeCompaction::prepare_compact() {
for (auto& rs : _input_rowsets) {
_input_row_num += rs->num_rows();
_input_segments += rs->num_segments();
_input_rowsets_size += rs->data_disk_size();
_input_rowsets_data_size += rs->data_disk_size();
_input_rowsets_index_size += rs->index_disk_size();
_input_rowsets_total_size += rs->total_disk_size();
}
LOG_INFO("start CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_data_size", _input_rowsets_size)
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size)
.tag("tablet_max_version", cloud_tablet()->max_version_unlocked())
.tag("cumulative_point", cloud_tablet()->cumulative_layer_point())
.tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0))
Expand Down Expand Up @@ -199,10 +203,14 @@ Status CloudCumulativeCompaction::execute_compact() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_data_size", _input_rowsets_size)
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size)
.tag("output_rows", _output_rowset->num_rows())
.tag("output_segments", _output_rowset->num_segments())
.tag("output_data_size", _output_rowset->data_disk_size())
.tag("output_rowset_data_size", _output_rowset->data_disk_size())
.tag("output_rowset_index_size", _output_rowset->index_disk_size())
.tag("output_rowset_total_size", _output_rowset->total_disk_size())
.tag("tablet_max_version", _tablet->max_version_unlocked())
.tag("cumulative_point", cloud_tablet()->cumulative_layer_point())
.tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0))
Expand All @@ -211,8 +219,8 @@ Status CloudCumulativeCompaction::execute_compact() {
_state = CompactionState::SUCCESS;

DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(_input_rowsets.size());
DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(_input_rowsets_size);
cumu_output_size << _output_rowset->data_disk_size();
DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(_input_rowsets_total_size);
cumu_output_size << _output_rowset->total_disk_size();

return Status::OK();
}
Expand Down Expand Up @@ -241,8 +249,8 @@ Status CloudCumulativeCompaction::modify_rowsets() {
compaction_job->set_output_cumulative_point(new_cumulative_point);
compaction_job->set_num_input_rows(_input_row_num);
compaction_job->set_num_output_rows(_output_rowset->num_rows());
compaction_job->set_size_input_rowsets(_input_rowsets_size);
compaction_job->set_size_output_rowsets(_output_rowset->data_disk_size());
compaction_job->set_size_input_rowsets(_input_rowsets_total_size);
compaction_job->set_size_output_rowsets(_output_rowset->total_disk_size());
compaction_job->set_num_input_segments(_input_segments);
compaction_job->set_num_output_segments(_output_rowset->num_segments());
compaction_job->set_num_input_rowsets(_input_rowsets.size());
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
// if rowsets have no delete version, check output_rowset total disk size satisfies promotion size.
return output_rowset->start_version() == last_cumulative_point &&
(last_delete_version.first != -1 ||
output_rowset->data_disk_size() >= cloud_promotion_size(tablet) ||
output_rowset->total_disk_size() >= cloud_promotion_size(tablet) ||
satisfy_promotion_version)
? output_rowset->end_version() + 1
: last_cumulative_point;
Expand Down
26 changes: 17 additions & 9 deletions be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,19 @@ Status CloudFullCompaction::prepare_compact() {
for (auto& rs : _input_rowsets) {
_input_row_num += rs->num_rows();
_input_segments += rs->num_segments();
_input_rowsets_size += rs->data_disk_size();
_input_rowsets_data_size += rs->data_disk_size();
_input_rowsets_index_size += rs->index_disk_size();
_input_rowsets_total_size += rs->total_disk_size();
}
LOG_INFO("start CloudFullCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_data_size", _input_rowsets_size);
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size);
return st;
}

Expand Down Expand Up @@ -162,16 +166,20 @@ Status CloudFullCompaction::execute_compact() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_data_size", _input_rowsets_size)
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size)
.tag("output_rows", _output_rowset->num_rows())
.tag("output_segments", _output_rowset->num_segments())
.tag("output_data_size", _output_rowset->data_disk_size());
.tag("output_rowset_data_size", _output_rowset->data_disk_size())
.tag("output_rowset_index_size", _output_rowset->index_disk_size())
.tag("output_rowset_total_size", _output_rowset->total_disk_size());

_state = CompactionState::SUCCESS;

DorisMetrics::instance()->full_compaction_deltas_total->increment(_input_rowsets.size());
DorisMetrics::instance()->full_compaction_bytes_total->increment(_input_rowsets_size);
full_output_size << _output_rowset->data_disk_size();
DorisMetrics::instance()->full_compaction_bytes_total->increment(_input_rowsets_total_size);
full_output_size << _output_rowset->total_disk_size();

return Status::OK();
}
Expand All @@ -193,8 +201,8 @@ Status CloudFullCompaction::modify_rowsets() {
compaction_job->set_output_cumulative_point(_output_rowset->end_version() + 1);
compaction_job->set_num_input_rows(_input_row_num);
compaction_job->set_num_output_rows(_output_rowset->num_rows());
compaction_job->set_size_input_rowsets(_input_rowsets_size);
compaction_job->set_size_output_rowsets(_output_rowset->data_disk_size());
compaction_job->set_size_input_rowsets(_input_rowsets_total_size);
compaction_job->set_size_output_rowsets(_output_rowset->total_disk_size());
compaction_job->set_num_input_segments(_input_segments);
compaction_job->set_num_output_segments(_output_rowset->num_segments());
compaction_job->set_num_input_rowsets(_input_rowsets.size());
Expand Down Expand Up @@ -341,7 +349,7 @@ Status CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_data_size", _input_rowsets_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size)
.tag("update_bitmap_size", delete_bitmap->delete_bitmap.size());
_tablet->tablet_meta()->delete_bitmap().merge(*delete_bitmap);
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void CloudRowsetBuilder::update_tablet_stats() {
tablet->fetch_add_approximate_num_rowsets(1);
tablet->fetch_add_approximate_num_segments(_rowset->num_segments());
tablet->fetch_add_approximate_num_rows(_rowset->num_rows());
tablet->fetch_add_approximate_data_size(_rowset->data_disk_size());
tablet->fetch_add_approximate_data_size(_rowset->total_disk_size());
tablet->fetch_add_approximate_cumu_num_rowsets(1);
tablet->fetch_add_approximate_cumu_num_deltas(_rowset->num_segments());
tablet->write_count.fetch_add(1, std::memory_order_relaxed);
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
sc_job->add_txn_ids(rs->txn_id());
sc_job->add_output_versions(rs->end_version());
num_output_rows += rs->num_rows();
size_output_rowsets += rs->data_disk_size();
size_output_rowsets += rs->total_disk_size();
num_output_segments += rs->num_segments();
}
sc_job->set_num_output_rows(num_output_rows);
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ int CloudTablet::delete_expired_stale_rowsets() {
void CloudTablet::update_base_size(const Rowset& rs) {
// Define base rowset as the rowset of version [2-x]
if (rs.start_version() == 2) {
_base_size = rs.data_disk_size();
_base_size = rs.total_disk_size();
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Status BaseCompaction::execute_compact() {

tablet()->set_last_base_compaction_success_time(UnixMillis());
DorisMetrics::instance()->base_compaction_deltas_total->increment(_input_rowsets.size());
DorisMetrics::instance()->base_compaction_bytes_total->increment(_input_rowsets_size);
DorisMetrics::instance()->base_compaction_bytes_total->increment(_input_rowsets_total_size);

return Status::OK();
}
Expand Down
38 changes: 21 additions & 17 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,10 @@ int64_t Compaction::get_avg_segment_rows() {
if (meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
int64_t compaction_goal_size_mbytes = meta->time_series_compaction_goal_size_mbytes();
return (compaction_goal_size_mbytes * 1024 * 1024 * 2) /
(_input_rowsets_size / (_input_row_num + 1) + 1);
(_input_rowsets_data_size / (_input_row_num + 1) + 1);
}
return config::vertical_compaction_max_segment_size /
(_input_rowsets_size / (_input_row_num + 1) + 1);
(_input_rowsets_data_size / (_input_row_num + 1) + 1);
}

CompactionMixin::CompactionMixin(StorageEngine& engine, TabletSharedPtr tablet,
Expand Down Expand Up @@ -294,9 +294,9 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
// build output rowset
RowsetMetaSharedPtr rowset_meta = std::make_shared<RowsetMeta>();
rowset_meta->set_num_rows(_input_row_num);
rowset_meta->set_total_disk_size(_input_rowsets_size);
rowset_meta->set_data_disk_size(_input_rowsets_size);
rowset_meta->set_index_disk_size(_input_index_size);
rowset_meta->set_total_disk_size(_input_rowsets_data_size + _input_rowsets_index_size);
rowset_meta->set_data_disk_size(_input_rowsets_data_size);
rowset_meta->set_index_disk_size(_input_rowsets_index_size);
rowset_meta->set_empty(_input_row_num == 0);
rowset_meta->set_num_segments(_input_num_segments);
rowset_meta->set_segments_overlap(NONOVERLAPPING);
Expand All @@ -309,12 +309,13 @@ Status CompactionMixin::do_compact_ordered_rowsets() {

void CompactionMixin::build_basic_info() {
for (auto& rowset : _input_rowsets) {
_input_rowsets_size += rowset->data_disk_size();
_input_index_size += rowset->index_disk_size();
_input_rowsets_data_size += rowset->data_disk_size();
_input_rowsets_index_size += rowset->index_disk_size();
_input_rowsets_total_size += rowset->total_disk_size();
_input_row_num += rowset->num_rows();
_input_num_segments += rowset->num_segments();
}
COUNTER_UPDATE(_input_rowsets_data_size_counter, _input_rowsets_size);
COUNTER_UPDATE(_input_rowsets_data_size_counter, _input_rowsets_data_size);
COUNTER_UPDATE(_input_row_num_counter, _input_row_num);
COUNTER_UPDATE(_input_segments_num_counter, _input_num_segments);

Expand Down Expand Up @@ -433,8 +434,12 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) {
<< ", disk=" << tablet()->data_dir()->path()
<< ", segments=" << _input_num_segments << ", input_row_num=" << _input_row_num
<< ", output_row_num=" << _output_rowset->num_rows()
<< ", input_rowset_size=" << _input_rowsets_size
<< ", output_rowset_size=" << _output_rowset->data_disk_size()
<< ", input_rowsets_data_size=" << _input_rowsets_data_size
<< ", input_rowsets_index_size=" << _input_rowsets_index_size
<< ", input_rowsets_total_size=" << _input_rowsets_total_size
<< ", output_rowset_data_size=" << _output_rowset->data_disk_size()
<< ", output_rowset_index_size=" << _output_rowset->index_disk_size()
<< ", output_rowset_total_size=" << _output_rowset->total_disk_size()
<< ". elapsed time=" << watch.get_elapse_second() << "s.";
_state = CompactionState::SUCCESS;
return Status::OK();
Expand All @@ -458,8 +463,8 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) {
<< ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version
<< ", current_max_version=" << tablet()->max_version().second
<< ", disk=" << tablet()->data_dir()->path() << ", segments=" << _input_num_segments
<< ", input_rowset_size=" << _input_rowsets_size
<< ", output_rowset_size=" << _output_rowset->data_disk_size()
<< ", input_data_size=" << _input_rowsets_data_size
<< ", output_rowset_size=" << _output_rowset->total_disk_size()
<< ", input_row_num=" << _input_row_num
<< ", output_row_num=" << _output_rowset->num_rows()
<< ", filtered_row_num=" << _stats.filtered_rows
Expand Down Expand Up @@ -781,9 +786,8 @@ Status Compaction::do_inverted_index_compaction() {
}

// index compaction should update total disk size and index disk size
_output_rowset->rowset_meta()->set_data_disk_size(_output_rowset->data_disk_size() +
inverted_index_file_size);
_output_rowset->rowset_meta()->set_total_disk_size(_output_rowset->data_disk_size() +
_output_rowset->rowset_meta()->set_data_disk_size(_output_rowset->data_disk_size());
_output_rowset->rowset_meta()->set_total_disk_size(_output_rowset->total_disk_size() +
inverted_index_file_size);
_output_rowset->rowset_meta()->set_index_disk_size(_output_rowset->index_disk_size() +
inverted_index_file_size);
Expand All @@ -794,8 +798,8 @@ Status Compaction::do_inverted_index_compaction() {
LOG(INFO) << "succeed to do index compaction"
<< ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num
<< ", output row number=" << _output_rowset->num_rows()
<< ", input_rowset_size=" << _input_rowsets_size
<< ", output_rowset_size=" << _output_rowset->data_disk_size()
<< ", input_rowsets_total_size=" << _input_rowsets_total_size
<< ", output_rowset_total_size=" << _output_rowset->total_disk_size()
<< ", inverted index file size=" << inverted_index_file_size
<< ". elapsed time=" << inverted_watch.get_elapse_second() << "s.";

Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ class Compaction {
BaseTabletSPtr _tablet;

std::vector<RowsetSharedPtr> _input_rowsets;
int64_t _input_rowsets_size {0};
int64_t _input_rowsets_data_size {0};
int64_t _input_rowsets_index_size {0};
int64_t _input_rowsets_total_size {0};
int64_t _input_row_num {0};
int64_t _input_num_segments {0};
int64_t _input_index_size {0};

Merger::Statistics _stats;

Expand Down
Loading

0 comments on commit faee48f

Please sign in to comment.