Skip to content

Commit

Permalink
fix query error -230 VERSION_ALREADY_MERGED
Browse files Browse the repository at this point in the history
  • Loading branch information
yujun777 committed Dec 8, 2023
1 parent a650266 commit b03e29b
Show file tree
Hide file tree
Showing 54 changed files with 1,093 additions and 222 deletions.
12 changes: 12 additions & 0 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ void AgentServer::start_workers(ExecEnv* exec_env) {
_gc_binlog_workers = std::make_unique<TaskWorkerPool>(
"GC_BINLOG", 1, [&engine](auto&& task) { return gc_binlog_callback(engine, task); });

_visible_version_workers = std::make_unique<TaskWorkerPool>(
"UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return visible_version_callback(engine, task); });

_report_task_workers = std::make_unique<ReportWorker>(
"REPORT_TASK", _master_info, config::report_task_interval_seconds, [&master_info = _master_info] { report_task_callback(master_info); });

Expand Down Expand Up @@ -259,6 +262,15 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
"task(signature={}) has wrong request member = gc_binlog_req", signature);
}
break;
case TTaskType::UPDATE_VISIBLE_VERSION:
if (task.__isset.visible_version_req) {
_visible_version_workers->submit_task(task);
} else {
ret_st = Status::InvalidArgument(
"task(signature={}) has wrong request member = visible_version_req",
signature);
}
break;
default:
ret_st = Status::InvalidArgument("task(signature={}, type={}) has wrong task type",
signature, task_type);
Expand Down
1 change: 1 addition & 0 deletions be/src/agent/agent_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class AgentServer {
std::unique_ptr<TaskWorkerPool> _push_storage_policy_workers;
std::unique_ptr<TopicSubscriber> _topic_subscriber;
std::unique_ptr<TaskWorkerPool> _gc_binlog_workers;
std::unique_ptr<TaskWorkerPool> _visible_version_workers;
};

} // end namespace doris
26 changes: 23 additions & 3 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ bvar::Adder<uint64_t> ALTER_count("task", "ALTER_TABLE");
bvar::Adder<uint64_t> CLONE_count("task", "CLONE");
bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE");
bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG");
bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION");

void add_task_count(const TAgentTaskRequest& task, int n) {
// clang-format off
Expand Down Expand Up @@ -378,6 +379,7 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
ADD_TASK_COUNT(CLONE)
ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
ADD_TASK_COUNT(GC_BINLOG)
ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION)
#undef ADD_TASK_COUNT
case TTaskType::REALTIME_PUSH:
case TTaskType::PUSH:
Expand Down Expand Up @@ -846,7 +848,9 @@ void check_consistency_callback(StorageEngine& engine, const TAgentTaskRequest&

void report_task_callback(const TMasterInfo& master_info) {
TReportRequest request;
random_sleep(5);
if (config::report_random_wait) {
random_sleep(5);
}
request.__isset.tasks = true;
{
std::lock_guard lock(s_task_signatures_mtx);
Expand All @@ -870,7 +874,9 @@ void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info)
// Random sleep 1~5 seconds before doing report.
// In order to avoid the problem that the FE receives many report requests at the same time
// and can not be processed.
random_sleep(5);
if (config::report_random_wait) {
random_sleep(5);
}

TReportRequest request;
request.__set_backend(BackendOptions::get_local_backend());
Expand Down Expand Up @@ -904,7 +910,9 @@ void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info)
}

void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_info) {
random_sleep(5);
if (config::report_random_wait) {
random_sleep(5);
}

TReportRequest request;
request.__set_backend(BackendOptions::get_local_backend());
Expand All @@ -921,6 +929,12 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
return;
}

std::map<int64_t, int64_t> partitions_version;
StorageEngine::instance()->tablet_manager()->get_partitions_visible_version(
&partitions_version);
request.__set_partitions_version(std::move(partitions_version));

int64_t max_compaction_score =
std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(),
DorisMetrics::instance()->tablet_base_max_compaction_score->value());
Expand Down Expand Up @@ -1634,6 +1648,12 @@ void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
engine.gc_binlogs(gc_tablet_infos);
}

void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
const TVisibleVersionReq& visible_version_req = req.visible_version_req;
engine.tablet_manager()->update_partitions_visible_version(
visible_version_req.partition_version);
}

void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,
const TAgentTaskRequest& req) {
const auto& clone_req = req.clone_req;
Expand Down
2 changes: 2 additions & 0 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ void storage_medium_migrate_callback(StorageEngine& engine, const TAgentTaskRequ

void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void report_task_callback(const TMasterInfo& master_info);

void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info);
Expand Down
10 changes: 10 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ DEFINE_Int32(download_worker_count, "1");
DEFINE_Int32(make_snapshot_worker_count, "5");
// the count of thread to release snapshot
DEFINE_Int32(release_snapshot_worker_count, "5");
// report random wait a little time to avoid FE receiving multiple be reports at the same time.
// do not set it to false for production environment
DEFINE_mBool(report_random_wait, "true");
// the interval time(seconds) for agent report tasks signature to FE
DEFINE_mInt32(report_task_interval_seconds, "10");
// the interval time(seconds) for refresh storage policy from FE
Expand Down Expand Up @@ -422,6 +425,13 @@ DEFINE_Validator(compaction_task_num_per_fast_disk,
// How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation.
DEFINE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9");

// Not compact the invisible versions, but with some limitations:
// if not timeout, keep no more than compaction_keep_invisible_version_max_count versions;
// if timeout, keep no more than compaction_keep_invisible_version_min_count versions.
DEFINE_mInt32(compaction_keep_invisible_version_timeout_sec, "1800");
DEFINE_mInt32(compaction_keep_invisible_version_min_count, "50");
DEFINE_mInt32(compaction_keep_invisible_version_max_count, "500");

// Threshold to logging compaction trace, in seconds.
DEFINE_mInt32(base_compaction_trace_threshold, "60");
DEFINE_mInt32(cumulative_compaction_trace_threshold, "10");
Expand Down
10 changes: 10 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ DECLARE_Int32(download_worker_count);
DECLARE_Int32(make_snapshot_worker_count);
// the count of thread to release snapshot
DECLARE_Int32(release_snapshot_worker_count);
// report random wait a little time to avoid FE receiving multiple be reports at the same time.
// do not set it to false for production environment
DECLARE_mBool(report_random_wait);
// the interval time(seconds) for agent report tasks signature to FE
DECLARE_mInt32(report_task_interval_seconds);
// the interval time(seconds) for refresh storage policy from FE
Expand Down Expand Up @@ -473,6 +476,13 @@ DECLARE_mInt32(compaction_task_num_per_fast_disk);
// How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation.
DECLARE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round);

// Not compact the invisible versions, but with some limitations:
// if not timeout, keep no more than compaction_keep_invisible_version_max_count versions;
// if timeout, keep no more than compaction_keep_invisible_version_min_count versions.
DECLARE_mInt32(compaction_keep_invisible_version_timeout_sec);
DECLARE_mInt32(compaction_keep_invisible_version_min_count);
DECLARE_mInt32(compaction_keep_invisible_version_max_count);

// Threshold to logging compaction trace, in seconds.
DECLARE_mInt32(base_compaction_trace_threshold);
DECLARE_mInt32(cumulative_compaction_trace_threshold);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ Status FullCompaction::_check_all_version(const std::vector<RowsetSharedPtr>& ro
"Full compaction rowsets' versions not equal to all exist rowsets' versions. "
"full compaction rowsets max version={}-{}"
", current rowsets max version={}-{}"
"full compaction rowsets min version={}-{}, current rowsets min version=0-1",
", full compaction rowsets min version={}-{}, current rowsets min version=0-1",
last_rowset->start_version(), last_rowset->end_version(),
_tablet->max_version().first, _tablet->max_version().second,
first_rowset->start_version(), first_rowset->end_version());
Expand Down
21 changes: 21 additions & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gen_cpp/Types_types.h>
#include <netinet/in.h>

#include <atomic>
#include <charconv>
#include <cstdint>
#include <functional>
Expand All @@ -36,6 +37,7 @@
#include "io/io_common.h"
#include "olap/olap_define.h"
#include "util/hash_util.hpp"
#include "util/time.h"
#include "util/uid_util.h"

namespace doris {
Expand Down Expand Up @@ -511,4 +513,23 @@ struct RidAndPos {

using PartialUpdateReadPlan = std::map<RowsetId, std::map<uint32_t, std::vector<RidAndPos>>>;

// used for controll compaction
struct VersionWithTime {
std::atomic<int64_t> version;
int64_t update_ts;

VersionWithTime() : version(0), update_ts(MonotonicMillis()) {}

void update_version_monoto(int64_t new_version) {
int64_t cur_version = version.load(std::memory_order_relaxed);
while (cur_version < new_version) {
if (version.compare_exchange_strong(cur_version, new_version, std::memory_order_relaxed,
std::memory_order_relaxed)) {
update_ts = MonotonicMillis();
break;
}
}
}
};

} // namespace doris
71 changes: 53 additions & 18 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1289,23 +1289,15 @@ std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_cumulative_compac
if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
return candidate_rowsets;
}
{
std::shared_lock rlock(_meta_lock);
for (const auto& [version, rs] : _rs_version_map) {
if (version.first >= _cumulative_point && rs->is_local()) {
candidate_rowsets.push_back(rs);
}
}
}
std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
return candidate_rowsets;
return _pick_visible_rowsets_to_compaction(_cumulative_point,
std::numeric_limits<int64_t>::max());
}

std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_single_replica_compaction() {
std::vector<RowsetSharedPtr> candidate_rowsets;
{
std::shared_lock rlock(_meta_lock);
for (const auto& [version, rs] : _rs_version_map) {
for (const auto& [_, rs] : _rs_version_map) {
if (rs->is_local()) {
candidate_rowsets.push_back(rs);
}
Expand All @@ -1316,12 +1308,46 @@ std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_single_replica_co
}

std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_base_compaction() {
return _pick_visible_rowsets_to_compaction(std::numeric_limits<int64_t>::min(),
_cumulative_point - 1);
}

std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_full_compaction() {
return pick_candidate_rowsets_to_single_replica_compaction();
}

std::vector<RowsetSharedPtr> Tablet::_pick_visible_rowsets_to_compaction(
int64_t min_start_version, int64_t max_start_version) {
int64_t visible_version = std::numeric_limits<int64_t>::max();
int32_t keep_invisible_version_limit = config::compaction_keep_invisible_version_max_count;
if (auto version_info = std::atomic_load_explicit(&_visible_version, std::memory_order_relaxed);
version_info != nullptr) {
visible_version = version_info->version.load(std::memory_order_relaxed);
bool invisible_timeout = (MonotonicMillis() - version_info->update_ts) >
config::compaction_keep_invisible_version_timeout_sec * 1000L;
if (invisible_timeout) {
keep_invisible_version_limit = config::compaction_keep_invisible_version_min_count;
}
}

std::vector<RowsetSharedPtr> candidate_rowsets;
{
std::shared_lock rlock(_meta_lock);
int64_t tablet_max_version = max_version_unlocked().second;
for (const auto& [version, rs] : _rs_version_map) {
// Do compaction on local rowsets only.
if (version.first < _cumulative_point && rs->is_local()) {
int64_t version_start = version.first;
// rowset is remote or rowset is not in given range
if (!rs->is_local() || version_start < min_start_version ||
version_start > max_start_version) {
continue;
}

// can compact, met one of the conditions:
// 1. had been visible;
// 2. exceeds the limit of keep invisible versions.
int64_t version_end = version.second;
if (version_end <= visible_version ||
(tablet_max_version - version_end >= keep_invisible_version_limit)) {
candidate_rowsets.push_back(rs);
}
}
Expand All @@ -1330,10 +1356,6 @@ std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_base_compaction()
return candidate_rowsets;
}

std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_full_compaction() {
return pick_candidate_rowsets_to_single_replica_compaction();
}

std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_build_inverted_index(
const std::set<int32_t>& alter_index_uids, bool is_drop_op) {
std::vector<RowsetSharedPtr> candidate_rowsets;
Expand Down Expand Up @@ -1718,13 +1740,26 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info,
}
});

int64_t total_version_count = _tablet_meta->version_count();

// For compatibility.
// For old fe, it wouldn't send visible version request to be, then be's visible version is always 0.
// Let visible_version_count set to total_version_count in be's report.
int64_t visible_version_count = total_version_count;
if (auto version_info = std::atomic_load_explicit(&_visible_version, std::memory_order_relaxed);
version_info != nullptr) {
if (auto version = version_info->version.load(std::memory_order_relaxed); version > 0) {
visible_version_count = _tablet_meta->version_count_cross_with_range({0, version});
}
}
// the report version is the largest continuous version, same logic as in FE side
tablet_info->__set_version(cversion.second);
// Useless but it is a required filed in TTabletInfo
tablet_info->__set_version_hash(0);
tablet_info->__set_partition_id(_tablet_meta->partition_id());
tablet_info->__set_storage_medium(_data_dir->storage_medium());
tablet_info->__set_version_count(_tablet_meta->version_count());
tablet_info->__set_total_version_count(total_version_count);
tablet_info->__set_visible_version_count(visible_version_count);
tablet_info->__set_path_hash(_data_dir->path_hash());
tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema()->is_in_memory());
tablet_info->__set_replica_id(replica_id());
Expand Down
10 changes: 10 additions & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@ class Tablet final : public BaseTablet {

std::string get_last_base_compaction_status() { return _last_base_compaction_status; }

void set_visible_version(const std::shared_ptr<const VersionWithTime>& visible_version) {
std::atomic_store_explicit(&_visible_version, visible_version, std::memory_order_relaxed);
}

inline bool all_beta() const {
std::shared_lock rdlock(_meta_lock);
return _tablet_meta->all_beta();
Expand Down Expand Up @@ -584,6 +588,9 @@ class Tablet final : public BaseTablet {
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy);
uint32_t _calc_base_compaction_score() const;

std::vector<RowsetSharedPtr> _pick_visible_rowsets_to_compaction(int64_t min_start_version,
int64_t max_start_version);

// When the proportion of empty edges in the adjacency matrix used to represent the version graph
// in the version tracker is greater than the threshold, rebuild the version tracker
bool _reconstruct_version_tracker_if_necessary();
Expand Down Expand Up @@ -702,6 +709,9 @@ class Tablet final : public BaseTablet {
DISALLOW_COPY_AND_ASSIGN(Tablet);

int64_t _io_error_times = 0;

// partition's visible version. it sync from fe, but not real-time.
std::shared_ptr<const VersionWithTime> _visible_version;
};

inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() {
Expand Down
Loading

0 comments on commit b03e29b

Please sign in to comment.