Skip to content

Commit

Permalink
[fix](inverted index) cloud mod supports time series (#33414)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzxl1993 authored Apr 10, 2024
1 parent 60acd8c commit 62a5bc3
Show file tree
Hide file tree
Showing 18 changed files with 483 additions and 35 deletions.
17 changes: 11 additions & 6 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,11 @@ Status CloudCumulativeCompaction::execute_compact() {
Status CloudCumulativeCompaction::modify_rowsets() {
// calculate new cumulative point
int64_t input_cumulative_point = cloud_tablet()->cumulative_layer_point();
int64_t new_cumulative_point = _engine.cumu_compaction_policy()->new_cumulative_point(
cloud_tablet(), _output_rowset, _last_delete_version, input_cumulative_point);
auto compaction_policy = cloud_tablet()->tablet_meta()->compaction_policy();
int64_t new_cumulative_point =
_engine.cumu_compaction_policy(compaction_policy)
->new_cumulative_point(cloud_tablet(), _output_rowset, _last_delete_version,
input_cumulative_point);
// commit compaction job
cloud::TabletJobInfoPB job;
auto idx = job.mutable_idx();
Expand Down Expand Up @@ -352,10 +355,12 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
}

size_t compaction_score = 0;
_engine.cumu_compaction_policy()->pick_input_rowsets(
cloud_tablet(), candidate_rowsets, config::cumulative_compaction_max_deltas,
config::cumulative_compaction_min_deltas, &_input_rowsets, &_last_delete_version,
&compaction_score);
auto compaction_policy = cloud_tablet()->tablet_meta()->compaction_policy();
_engine.cumu_compaction_policy(compaction_policy)
->pick_input_rowsets(cloud_tablet(), candidate_rowsets,
config::cumulative_compaction_max_deltas,
config::cumulative_compaction_min_deltas, &_input_rowsets,
&_last_delete_version, &compaction_score);

if (_input_rowsets.empty()) {
return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no suitable versions");
Expand Down
150 changes: 149 additions & 1 deletion be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size
return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size));
}

int CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
Expand Down Expand Up @@ -213,4 +213,152 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
: last_cumulative_point;
}

int32_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
size_t* compaction_score, bool allow_delete) {
if (tablet->tablet_state() == TABLET_NOTREADY) {
return 0;
}

int64_t compaction_goal_size_mbytes =
tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();

int transient_size = 0;
*compaction_score = 0;
input_rowsets->clear();
int64_t total_size = 0;

for (const auto& rowset : candidate_rowsets) {
// check whether this rowset is delete version
if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
*last_delete_version = rowset->version();
if (!input_rowsets->empty()) {
// we meet a delete version, and there were other versions before.
// we should compact those version before handling them over to base compaction
break;
} else {
// we meet a delete version, and no other versions before, skip it and continue
input_rowsets->clear();
*compaction_score = 0;
transient_size = 0;
total_size = 0;
continue;
}
}

*compaction_score += rowset->rowset_meta()->get_compaction_score();
total_size += rowset->rowset_meta()->total_disk_size();

transient_size += 1;
input_rowsets->push_back(rowset);

// Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size
if (total_size >= (compaction_goal_size_mbytes * 1024 * 1024)) {
if (input_rowsets->size() == 1 &&
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
// Only 1 non-overlapping rowset, skip it
input_rowsets->clear();
*compaction_score = 0;
total_size = 0;
continue;
}
return transient_size;
}
}

// if there is delete version, do compaction directly
if (last_delete_version->first != -1) {
// if there is only one rowset and not overlapping,
// we do not need to do cumulative compaction
if (input_rowsets->size() == 1 &&
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
input_rowsets->clear();
*compaction_score = 0;
}
return transient_size;
}

// Condition 2: the number of input files reaches the threshold specified by parameter compaction_file_count_threshold
if (*compaction_score >= tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
return transient_size;
}

// Condition 3: level1 achieve compaction_goal_size
std::vector<RowsetSharedPtr> level1_rowsets;
if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
int64_t continuous_size = 0;
for (const auto& rowset : candidate_rowsets) {
const auto& rs_meta = rowset->rowset_meta();
if (rs_meta->compaction_level() == 0) {
break;
}
level1_rowsets.push_back(rowset);
continuous_size += rs_meta->total_disk_size();
if (level1_rowsets.size() >= 2) {
if (continuous_size >= compaction_goal_size_mbytes * 1024 * 1024) {
input_rowsets->swap(level1_rowsets);
return input_rowsets->size();
}
}
}
}

int64_t now = UnixMillis();
int64_t last_cumu = tablet->last_cumu_compaction_success_time();
if (last_cumu != 0) {
int64_t cumu_interval = now - last_cumu;

// Condition 4: the time interval between compactions exceeds the value specified by parameter compaction_time_threshold_second
if (cumu_interval >
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 1000)) {
if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
if (input_rowsets->empty() && level1_rowsets.size() >= 2) {
input_rowsets->swap(level1_rowsets);
return input_rowsets->size();
}
}
return transient_size;
}
}

input_rowsets->clear();
*compaction_score = 0;

return 0;
}

int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_compaction_level(
const std::vector<RowsetSharedPtr>& input_rowsets) {
int64_t first_level = 0;
for (size_t i = 0; i < input_rowsets.size(); i++) {
int64_t cur_level = input_rowsets[i]->rowset_meta()->compaction_level();
if (i == 0) {
first_level = cur_level;
} else {
if (first_level != cur_level) {
LOG(ERROR) << "Failed to check compaction level, first_level: " << first_level
<< ", cur_level: " << cur_level;
}
}
}
return first_level + 1;
}

int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_cumulative_point(
CloudTablet* tablet, const RowsetSharedPtr& output_rowset, Version& last_delete_version,
int64_t last_cumulative_point) {
if (tablet->tablet_state() != TABLET_RUNNING || output_rowset->num_segments() == 0) {
return last_cumulative_point;
}

if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2 &&
output_rowset->rowset_meta()->compaction_level() < 2) {
return last_cumulative_point;
}

return output_rowset->end_version() + 1;
}

} // namespace doris
63 changes: 54 additions & 9 deletions be/src/cloud/cloud_cumulative_compaction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,26 @@ namespace doris {
class Tablet;
struct Version;

class CloudSizeBasedCumulativeCompactionPolicy {
class CloudCumulativeCompactionPolicy {
public:
virtual ~CloudCumulativeCompactionPolicy() = default;

virtual int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& output_rowset,
Version& last_delete_version,
int64_t last_cumulative_point) = 0;

virtual int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) = 0;

virtual int32_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score,
const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets,
Version* last_delete_version, size_t* compaction_score,
bool allow_delete = false) = 0;
};

class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactionPolicy {
public:
CloudSizeBasedCumulativeCompactionPolicy(
int64_t promotion_size = config::compaction_promotion_size_mbytes * 1024 * 1024,
Expand All @@ -43,17 +62,23 @@ class CloudSizeBasedCumulativeCompactionPolicy {
int64_t compaction_min_size = config::compaction_min_size_mbytes * 1024 * 1024,
int64_t promotion_version_count = config::compaction_promotion_version_count);

~CloudSizeBasedCumulativeCompactionPolicy() {}
~CloudSizeBasedCumulativeCompactionPolicy() override = default;

int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& output_rowset,
Version& last_delete_version, int64_t last_cumulative_point);
Version& last_delete_version,
int64_t last_cumulative_point) override;

int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) override {
return 0;
}

int pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets,
Version* last_delete_version, size_t* compaction_score,
bool allow_delete = false);
int32_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score,
const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets,
Version* last_delete_version, size_t* compaction_score,
bool allow_delete = false) override;

private:
int64_t _level_size(const int64_t size);
Expand All @@ -73,4 +98,24 @@ class CloudSizeBasedCumulativeCompactionPolicy {
int64_t _promotion_version_count;
};

class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompactionPolicy {
public:
CloudTimeSeriesCumulativeCompactionPolicy() = default;
~CloudTimeSeriesCumulativeCompactionPolicy() override = default;

int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& output_rowset,
Version& last_delete_version,
int64_t last_cumulative_point) override;

int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) override;

int32_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score,
const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets,
Version* last_delete_version, size_t* compaction_score,
bool allow_delete = false) override;
};

} // namespace doris
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ Status CloudRowsetBuilder::init() {
}
RETURN_IF_ERROR(check_tablet_version_count());

using namespace std::chrono;
std::static_pointer_cast<CloudTablet>(_tablet)->last_load_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();

// build tablet schema in request level
_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
*_tablet->tablet_schema());
Expand Down
1 change: 1 addition & 0 deletions be/src/cloud/cloud_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context)
_rowset_meta->set_segments_overlap(_context.segments_overlap);
_rowset_meta->set_txn_id(_context.txn_id);
_rowset_meta->set_txn_expiration(_context.txn_expiration);
_rowset_meta->set_compaction_level(_context.compaction_level);
if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) {
_is_pending = true;
_rowset_meta->set_load_id(_context.load_id);
Expand Down
18 changes: 15 additions & 3 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "io/fs/s3_file_system.h"
#include "io/hdfs_util.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/memtable_flush_executor.h"
#include "olap/storage_policy.h"
#include "runtime/memory/cache_manager.h"
Expand Down Expand Up @@ -68,9 +69,12 @@ int get_base_thread_num() {
CloudStorageEngine::CloudStorageEngine(const UniqueId& backend_uid)
: BaseStorageEngine(Type::CLOUD, backend_uid),
_meta_mgr(std::make_unique<cloud::CloudMetaMgr>()),
_tablet_mgr(std::make_unique<CloudTabletMgr>(*this)),
_cumulative_compaction_policy(
std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>()) {}
_tablet_mgr(std::make_unique<CloudTabletMgr>(*this)) {
_cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>();
_cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
std::make_shared<CloudTimeSeriesCumulativeCompactionPolicy>();
}

CloudStorageEngine::~CloudStorageEngine() {
stop();
Expand Down Expand Up @@ -766,4 +770,12 @@ Status CloudStorageEngine::get_compaction_status_json(std::string* result) {
return Status::OK();
}

std::shared_ptr<CloudCumulativeCompactionPolicy> CloudStorageEngine::cumu_compaction_policy(
std::string_view compaction_policy) {
if (!_cumulative_compaction_policies.contains(compaction_policy)) {
return _cumulative_compaction_policies.at(CUMULATIVE_SIZE_BASED_POLICY);
}
return _cumulative_compaction_policies.at(compaction_policy);
}

} // namespace doris
11 changes: 6 additions & 5 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ class CloudStorageEngine final : public BaseStorageEngine {
void get_cumu_compaction(int64_t tablet_id,
std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res);

CloudSizeBasedCumulativeCompactionPolicy* cumu_compaction_policy() const {
return _cumulative_compaction_policy.get();
}

Status submit_compaction_task(const CloudTabletSPtr& tablet, CompactionType compaction_type);

Status get_compaction_status_json(std::string* result);
Expand All @@ -110,6 +106,9 @@ class CloudStorageEngine final : public BaseStorageEngine {
return _submitted_full_compactions.count(tablet_id);
}

std::shared_ptr<CloudCumulativeCompactionPolicy> cumu_compaction_policy(
std::string_view compaction_policy);

private:
void _refresh_storage_vault_info_thread_callback();
void _vacuum_stale_rowsets_thread_callback();
Expand Down Expand Up @@ -151,7 +150,9 @@ class CloudStorageEngine final : public BaseStorageEngine {
std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;

std::shared_ptr<CloudSizeBasedCumulativeCompactionPolicy> _cumulative_compaction_policy;
using CumuPolices =
std::unordered_map<std::string_view, std::shared_ptr<CloudCumulativeCompactionPolicy>>;
CumuPolices _cumulative_compaction_policies;
};

} // namespace doris
18 changes: 18 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "io/cache/block_file_cache_factory.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset.h"
Expand Down Expand Up @@ -409,6 +410,23 @@ Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_write
}

int64_t CloudTablet::get_cloud_base_compaction_score() const {
if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
bool has_delete = false;
int64_t point = cumulative_layer_point();
for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
if (rs_meta->start_version() >= point) {
continue;
}
if (rs_meta->has_delete_predicate()) {
has_delete = true;
break;
}
}
if (!has_delete) {
return 0;
}
}

return _approximate_num_rowsets.load(std::memory_order_relaxed) -
_approximate_cumu_num_rowsets.load(std::memory_order_relaxed);
}
Expand Down
Loading

0 comments on commit 62a5bc3

Please sign in to comment.