From b03e29be54ca43ff721edb16b3f08040d3425f4f Mon Sep 17 00:00:00 2001 From: yujun777 Date: Thu, 23 Nov 2023 10:04:28 +0800 Subject: [PATCH] fix query error -230 VERSION_ALREADY_MERGED --- be/src/agent/agent_server.cpp | 12 + be/src/agent/agent_server.h | 1 + be/src/agent/task_worker_pool.cpp | 26 +- be/src/agent/task_worker_pool.h | 2 + be/src/common/config.cpp | 10 + be/src/common/config.h | 10 + be/src/olap/full_compaction.cpp | 2 +- be/src/olap/olap_common.h | 21 ++ be/src/olap/tablet.cpp | 71 +++-- be/src/olap/tablet.h | 10 + be/src/olap/tablet_manager.cpp | 80 ++++-- be/src/olap/tablet_manager.h | 18 +- be/src/olap/tablet_meta.cpp | 10 + be/src/olap/tablet_meta.h | 1 + be/src/olap/task/engine_clone_task.cpp | 2 + .../org/apache/doris/alter/AlterHandler.java | 3 +- .../org/apache/doris/backup/RestoreJob.java | 3 +- .../apache/doris/catalog/MetadataViewer.java | 2 +- .../org/apache/doris/catalog/Replica.java | 75 +++-- .../java/org/apache/doris/catalog/Tablet.java | 22 +- .../doris/catalog/TabletInvertedIndex.java | 32 ++- .../apache/doris/catalog/TabletStatMgr.java | 11 +- .../apache/doris/clone/TabletSchedCtx.java | 13 +- .../apache/doris/clone/TabletScheduler.java | 6 +- .../doris/common/proc/ReplicasProcNode.java | 8 +- .../common/proc/TabletHealthProcDir.java | 3 +- .../doris/common/proc/TabletsProcDir.java | 11 +- .../doris/datasource/InternalCatalog.java | 5 +- .../apache/doris/master/ReportHandler.java | 67 +++-- .../org/apache/doris/system/Diagnoser.java | 5 +- .../org/apache/doris/task/AgentBatchTask.java | 10 + .../doris/task/UpdateVisibleVersionTask.java | 40 +++ .../transaction/DatabaseTransactionMgr.java | 24 +- .../transaction/GlobalTransactionMgr.java | 5 +- .../transaction/PublishVersionDaemon.java | 31 ++- .../apache/doris/alter/RollupJobV2Test.java | 10 +- .../doris/alter/SchemaChangeJobV2Test.java | 6 +- .../doris/analysis/AdminShowReplicaTest.java | 3 +- .../org/apache/doris/catalog/ReplicaTest.java | 21 +- .../clone/DiskReblanceWhenSchedulerIdle.java | 3 +- .../doris/clone/RebalancerTestUtil.java | 5 +- .../apache/doris/clone/RepairVersionTest.java | 8 +- .../doris/clone/TabletReplicaTooSlowTest.java | 4 +- .../doris/clone/TabletSchedCtxTest.java | 16 +- .../apache/doris/planner/QueryPlanTest.java | 12 +- .../DatabaseTransactionMgrTest.java | 6 +- .../transaction/GlobalTransactionMgrTest.java | 29 +- gensrc/thrift/AgentService.thrift | 5 + gensrc/thrift/BackendService.thrift | 5 +- gensrc/thrift/MasterService.thrift | 4 +- gensrc/thrift/Types.thrift | 3 +- .../test_compaction_with_visible_version.out | 259 ++++++++++++++++++ .../regression/suite/SuiteCluster.groovy | 5 +- ...est_compaction_with_visible_version.groovy | 259 ++++++++++++++++++ 54 files changed, 1093 insertions(+), 222 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/UpdateVisibleVersionTask.java create mode 100644 regression-test/data/compaction/test_compaction_with_visible_version.out create mode 100644 regression-test/suites/compaction/test_compaction_with_visible_version.groovy diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index 9902a0ad726e474..01468a23e81eb0f 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -144,6 +144,9 @@ void AgentServer::start_workers(ExecEnv* exec_env) { _gc_binlog_workers = std::make_unique( "GC_BINLOG", 1, [&engine](auto&& task) { return gc_binlog_callback(engine, task); }); + _visible_version_workers = std::make_unique( + "UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return visible_version_callback(engine, task); }); + _report_task_workers = std::make_unique( "REPORT_TASK", _master_info, config::report_task_interval_seconds, [&master_info = _master_info] { report_task_callback(master_info); }); @@ -259,6 +262,15 @@ void AgentServer::submit_tasks(TAgentResult& agent_result, "task(signature={}) has wrong request member = gc_binlog_req", signature); } break; + case TTaskType::UPDATE_VISIBLE_VERSION: + if (task.__isset.visible_version_req) { + _visible_version_workers->submit_task(task); + } else { + ret_st = Status::InvalidArgument( + "task(signature={}) has wrong request member = visible_version_req", + signature); + } + break; default: ret_st = Status::InvalidArgument("task(signature={}, type={}) has wrong task type", signature, task_type); diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h index b3889d894f9e972..44dd64c79a51786 100644 --- a/be/src/agent/agent_server.h +++ b/be/src/agent/agent_server.h @@ -96,6 +96,7 @@ class AgentServer { std::unique_ptr _push_storage_policy_workers; std::unique_ptr _topic_subscriber; std::unique_ptr _gc_binlog_workers; + std::unique_ptr _visible_version_workers; }; } // end namespace doris diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 5bb3fb8ad79e971..76771f5da75dd5e 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -351,6 +351,7 @@ bvar::Adder ALTER_count("task", "ALTER_TABLE"); bvar::Adder CLONE_count("task", "CLONE"); bvar::Adder STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE"); bvar::Adder GC_BINLOG_count("task", "GC_BINLOG"); +bvar::Adder UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION"); void add_task_count(const TAgentTaskRequest& task, int n) { // clang-format off @@ -378,6 +379,7 @@ void add_task_count(const TAgentTaskRequest& task, int n) { ADD_TASK_COUNT(CLONE) ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE) ADD_TASK_COUNT(GC_BINLOG) + ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION) #undef ADD_TASK_COUNT case TTaskType::REALTIME_PUSH: case TTaskType::PUSH: @@ -846,7 +848,9 @@ void check_consistency_callback(StorageEngine& engine, const TAgentTaskRequest& void report_task_callback(const TMasterInfo& master_info) { TReportRequest request; - random_sleep(5); + if (config::report_random_wait) { + random_sleep(5); + } request.__isset.tasks = true; { std::lock_guard lock(s_task_signatures_mtx); @@ -870,7 +874,9 @@ void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info) // Random sleep 1~5 seconds before doing report. // In order to avoid the problem that the FE receives many report requests at the same time // and can not be processed. - random_sleep(5); + if (config::report_random_wait) { + random_sleep(5); + } TReportRequest request; request.__set_backend(BackendOptions::get_local_backend()); @@ -904,7 +910,9 @@ void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info) } void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_info) { - random_sleep(5); + if (config::report_random_wait) { + random_sleep(5); + } TReportRequest request; request.__set_backend(BackendOptions::get_local_backend()); @@ -921,6 +929,12 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1); return; } + + std::map partitions_version; + StorageEngine::instance()->tablet_manager()->get_partitions_visible_version( + &partitions_version); + request.__set_partitions_version(std::move(partitions_version)); + int64_t max_compaction_score = std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(), DorisMetrics::instance()->tablet_base_max_compaction_score->value()); @@ -1634,6 +1648,12 @@ void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req) { engine.gc_binlogs(gc_tablet_infos); } +void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req) { + const TVisibleVersionReq& visible_version_req = req.visible_version_req; + engine.tablet_manager()->update_partitions_visible_version( + visible_version_req.partition_version); +} + void clone_callback(StorageEngine& engine, const TMasterInfo& master_info, const TAgentTaskRequest& req) { const auto& clone_req = req.clone_req; diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index 1c9327b42d71966..a3c745ef7a14f2d 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -161,6 +161,8 @@ void storage_medium_migrate_callback(StorageEngine& engine, const TAgentTaskRequ void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req); +void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req); + void report_task_callback(const TMasterInfo& master_info); void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 932e24110b39fee..d193be97be260d4 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -182,6 +182,9 @@ DEFINE_Int32(download_worker_count, "1"); DEFINE_Int32(make_snapshot_worker_count, "5"); // the count of thread to release snapshot DEFINE_Int32(release_snapshot_worker_count, "5"); +// report random wait a little time to avoid FE receiving multiple be reports at the same time. +// do not set it to false for production environment +DEFINE_mBool(report_random_wait, "true"); // the interval time(seconds) for agent report tasks signature to FE DEFINE_mInt32(report_task_interval_seconds, "10"); // the interval time(seconds) for refresh storage policy from FE @@ -422,6 +425,13 @@ DEFINE_Validator(compaction_task_num_per_fast_disk, // How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation. DEFINE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9"); +// Not compact the invisible versions, but with some limitations: +// if not timeout, keep no more than compaction_keep_invisible_version_max_count versions; +// if timeout, keep no more than compaction_keep_invisible_version_min_count versions. +DEFINE_mInt32(compaction_keep_invisible_version_timeout_sec, "1800"); +DEFINE_mInt32(compaction_keep_invisible_version_min_count, "50"); +DEFINE_mInt32(compaction_keep_invisible_version_max_count, "500"); + // Threshold to logging compaction trace, in seconds. DEFINE_mInt32(base_compaction_trace_threshold, "60"); DEFINE_mInt32(cumulative_compaction_trace_threshold, "10"); diff --git a/be/src/common/config.h b/be/src/common/config.h index b40f7aa303445a0..e9b0927bc92db42 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -231,6 +231,9 @@ DECLARE_Int32(download_worker_count); DECLARE_Int32(make_snapshot_worker_count); // the count of thread to release snapshot DECLARE_Int32(release_snapshot_worker_count); +// report random wait a little time to avoid FE receiving multiple be reports at the same time. +// do not set it to false for production environment +DECLARE_mBool(report_random_wait); // the interval time(seconds) for agent report tasks signature to FE DECLARE_mInt32(report_task_interval_seconds); // the interval time(seconds) for refresh storage policy from FE @@ -473,6 +476,13 @@ DECLARE_mInt32(compaction_task_num_per_fast_disk); // How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation. DECLARE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round); +// Not compact the invisible versions, but with some limitations: +// if not timeout, keep no more than compaction_keep_invisible_version_max_count versions; +// if timeout, keep no more than compaction_keep_invisible_version_min_count versions. +DECLARE_mInt32(compaction_keep_invisible_version_timeout_sec); +DECLARE_mInt32(compaction_keep_invisible_version_min_count); +DECLARE_mInt32(compaction_keep_invisible_version_max_count); + // Threshold to logging compaction trace, in seconds. DECLARE_mInt32(base_compaction_trace_threshold); DECLARE_mInt32(cumulative_compaction_trace_threshold); diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index 246056794c9a5d7..583a408a7c8ce01 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -134,7 +134,7 @@ Status FullCompaction::_check_all_version(const std::vector& ro "Full compaction rowsets' versions not equal to all exist rowsets' versions. " "full compaction rowsets max version={}-{}" ", current rowsets max version={}-{}" - "full compaction rowsets min version={}-{}, current rowsets min version=0-1", + ", full compaction rowsets min version={}-{}, current rowsets min version=0-1", last_rowset->start_version(), last_rowset->end_version(), _tablet->max_version().first, _tablet->max_version().second, first_rowset->start_version(), first_rowset->end_version()); diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 1921902a9d038b4..f43c2273a589d7d 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -36,6 +37,7 @@ #include "io/io_common.h" #include "olap/olap_define.h" #include "util/hash_util.hpp" +#include "util/time.h" #include "util/uid_util.h" namespace doris { @@ -511,4 +513,23 @@ struct RidAndPos { using PartialUpdateReadPlan = std::map>>; +// used for controll compaction +struct VersionWithTime { + std::atomic version; + int64_t update_ts; + + VersionWithTime() : version(0), update_ts(MonotonicMillis()) {} + + void update_version_monoto(int64_t new_version) { + int64_t cur_version = version.load(std::memory_order_relaxed); + while (cur_version < new_version) { + if (version.compare_exchange_strong(cur_version, new_version, std::memory_order_relaxed, + std::memory_order_relaxed)) { + update_ts = MonotonicMillis(); + break; + } + } + } +}; + } // namespace doris diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 33e27122da0feb6..5b13849c60942fe 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1289,23 +1289,15 @@ std::vector Tablet::pick_candidate_rowsets_to_cumulative_compac if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) { return candidate_rowsets; } - { - std::shared_lock rlock(_meta_lock); - for (const auto& [version, rs] : _rs_version_map) { - if (version.first >= _cumulative_point && rs->is_local()) { - candidate_rowsets.push_back(rs); - } - } - } - std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator); - return candidate_rowsets; + return _pick_visible_rowsets_to_compaction(_cumulative_point, + std::numeric_limits::max()); } std::vector Tablet::pick_candidate_rowsets_to_single_replica_compaction() { std::vector candidate_rowsets; { std::shared_lock rlock(_meta_lock); - for (const auto& [version, rs] : _rs_version_map) { + for (const auto& [_, rs] : _rs_version_map) { if (rs->is_local()) { candidate_rowsets.push_back(rs); } @@ -1316,12 +1308,46 @@ std::vector Tablet::pick_candidate_rowsets_to_single_replica_co } std::vector Tablet::pick_candidate_rowsets_to_base_compaction() { + return _pick_visible_rowsets_to_compaction(std::numeric_limits::min(), + _cumulative_point - 1); +} + +std::vector Tablet::pick_candidate_rowsets_to_full_compaction() { + return pick_candidate_rowsets_to_single_replica_compaction(); +} + +std::vector Tablet::_pick_visible_rowsets_to_compaction( + int64_t min_start_version, int64_t max_start_version) { + int64_t visible_version = std::numeric_limits::max(); + int32_t keep_invisible_version_limit = config::compaction_keep_invisible_version_max_count; + if (auto version_info = std::atomic_load_explicit(&_visible_version, std::memory_order_relaxed); + version_info != nullptr) { + visible_version = version_info->version.load(std::memory_order_relaxed); + bool invisible_timeout = (MonotonicMillis() - version_info->update_ts) > + config::compaction_keep_invisible_version_timeout_sec * 1000L; + if (invisible_timeout) { + keep_invisible_version_limit = config::compaction_keep_invisible_version_min_count; + } + } + std::vector candidate_rowsets; { std::shared_lock rlock(_meta_lock); + int64_t tablet_max_version = max_version_unlocked().second; for (const auto& [version, rs] : _rs_version_map) { - // Do compaction on local rowsets only. - if (version.first < _cumulative_point && rs->is_local()) { + int64_t version_start = version.first; + // rowset is remote or rowset is not in given range + if (!rs->is_local() || version_start < min_start_version || + version_start > max_start_version) { + continue; + } + + // can compact, met one of the conditions: + // 1. had been visible; + // 2. exceeds the limit of keep invisible versions. + int64_t version_end = version.second; + if (version_end <= visible_version || + (tablet_max_version - version_end >= keep_invisible_version_limit)) { candidate_rowsets.push_back(rs); } } @@ -1330,10 +1356,6 @@ std::vector Tablet::pick_candidate_rowsets_to_base_compaction() return candidate_rowsets; } -std::vector Tablet::pick_candidate_rowsets_to_full_compaction() { - return pick_candidate_rowsets_to_single_replica_compaction(); -} - std::vector Tablet::pick_candidate_rowsets_to_build_inverted_index( const std::set& alter_index_uids, bool is_drop_op) { std::vector candidate_rowsets; @@ -1718,13 +1740,26 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info, } }); + int64_t total_version_count = _tablet_meta->version_count(); + + // For compatibility. + // For old fe, it wouldn't send visible version request to be, then be's visible version is always 0. + // Let visible_version_count set to total_version_count in be's report. + int64_t visible_version_count = total_version_count; + if (auto version_info = std::atomic_load_explicit(&_visible_version, std::memory_order_relaxed); + version_info != nullptr) { + if (auto version = version_info->version.load(std::memory_order_relaxed); version > 0) { + visible_version_count = _tablet_meta->version_count_cross_with_range({0, version}); + } + } // the report version is the largest continuous version, same logic as in FE side tablet_info->__set_version(cversion.second); // Useless but it is a required filed in TTabletInfo tablet_info->__set_version_hash(0); tablet_info->__set_partition_id(_tablet_meta->partition_id()); tablet_info->__set_storage_medium(_data_dir->storage_medium()); - tablet_info->__set_version_count(_tablet_meta->version_count()); + tablet_info->__set_total_version_count(total_version_count); + tablet_info->__set_visible_version_count(visible_version_count); tablet_info->__set_path_hash(_data_dir->path_hash()); tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema()->is_in_memory()); tablet_info->__set_replica_id(replica_id()); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 4174a0ec26db13d..3e8b31156b56804 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -324,6 +324,10 @@ class Tablet final : public BaseTablet { std::string get_last_base_compaction_status() { return _last_base_compaction_status; } + void set_visible_version(const std::shared_ptr& visible_version) { + std::atomic_store_explicit(&_visible_version, visible_version, std::memory_order_relaxed); + } + inline bool all_beta() const { std::shared_lock rdlock(_meta_lock); return _tablet_meta->all_beta(); @@ -584,6 +588,9 @@ class Tablet final : public BaseTablet { std::shared_ptr cumulative_compaction_policy); uint32_t _calc_base_compaction_score() const; + std::vector _pick_visible_rowsets_to_compaction(int64_t min_start_version, + int64_t max_start_version); + // When the proportion of empty edges in the adjacency matrix used to represent the version graph // in the version tracker is greater than the threshold, rebuild the version tracker bool _reconstruct_version_tracker_if_necessary(); @@ -702,6 +709,9 @@ class Tablet final : public BaseTablet { DISALLOW_COPY_AND_ASSIGN(Tablet); int64_t _io_error_times = 0; + + // partition's visible version. it sync from fe, but not real-time. + std::shared_ptr _visible_version; }; inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() { diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index ad9480f24060bd0..aeacc44ae354483 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -1005,13 +1005,14 @@ Status TabletManager::build_all_report_tablets_info(std::map tablet_info.__set_transaction_ids(find->second); expire_txn_map.erase(find); } - tablet_version_num_hist.add(tablet->version_count()); + tablet_version_num_hist.add(tablet_info.total_version_count); auto& t_tablet_stat = local_cache->emplace_back(); t_tablet_stat.__set_tablet_id(tablet_info.tablet_id); t_tablet_stat.__set_data_size(tablet_info.data_size); t_tablet_stat.__set_remote_data_size(tablet_info.remote_data_size); - t_tablet_stat.__set_row_num(tablet_info.row_count); - t_tablet_stat.__set_version_count(tablet_info.version_count); + t_tablet_stat.__set_row_count(tablet_info.row_count); + t_tablet_stat.__set_total_version_count(tablet_info.total_version_count); + t_tablet_stat.__set_visible_version_count(tablet_info.visible_version_count); }; for_each_tablet(handler, filter_all_tablets); @@ -1245,9 +1246,29 @@ void TabletManager::update_root_path_info(std::map* path_ma void TabletManager::get_partition_related_tablets(int64_t partition_id, std::set* tablet_infos) { - std::shared_lock rdlock(_partition_tablet_map_lock); - if (_partition_tablet_map.find(partition_id) != _partition_tablet_map.end()) { - *tablet_infos = _partition_tablet_map[partition_id]; + std::shared_lock rdlock(_partitions_lock); + auto it = _partitions.find(partition_id); + if (it != _partitions.end()) { + *tablet_infos = it->second.tablets; + } +} + +void TabletManager::get_partitions_visible_version(std::map* partitions_version) { + std::shared_lock rdlock(_partitions_lock); + for (const auto& [partition_id, partition] : _partitions) { + partitions_version->insert( + {partition_id, partition.visible_version->version.load(std::memory_order_relaxed)}); + } +} + +void TabletManager::update_partitions_visible_version( + const std::map& partitions_version) { + std::shared_lock rdlock(_partitions_lock); + for (auto [partition_id, version] : partitions_version) { + auto it = _partitions.find(partition_id); + if (it != _partitions.end()) { + it->second.visible_version->update_version_monoto(version); + } } } @@ -1338,15 +1359,25 @@ TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id) { } void TabletManager::_add_tablet_to_partition(const TabletSharedPtr& tablet) { - std::lock_guard wrlock(_partition_tablet_map_lock); - _partition_tablet_map[tablet->partition_id()].insert(tablet->get_tablet_info()); + std::lock_guard wrlock(_partitions_lock); + auto& partition = _partitions[tablet->partition_id()]; + partition.tablets.insert(tablet->get_tablet_info()); + tablet->set_visible_version( + std::static_pointer_cast(partition.visible_version)); } void TabletManager::_remove_tablet_from_partition(const TabletSharedPtr& tablet) { - std::lock_guard wrlock(_partition_tablet_map_lock); - _partition_tablet_map[tablet->partition_id()].erase(tablet->get_tablet_info()); - if (_partition_tablet_map[tablet->partition_id()].empty()) { - _partition_tablet_map.erase(tablet->partition_id()); + tablet->set_visible_version(nullptr); + std::lock_guard wrlock(_partitions_lock); + auto it = _partitions.find(tablet->partition_id()); + if (it == _partitions.end()) { + return; + } + + auto& tablets = it->second.tablets; + tablets.erase(tablet->get_tablet_info()); + if (tablets.empty()) { + _partitions.erase(it); } } @@ -1383,33 +1414,30 @@ void TabletManager::get_tablets_distribution_on_different_disks( std::map>& tablets_num_on_disk, std::map>>& tablets_info_on_disk) { std::vector data_dirs = StorageEngine::instance()->get_stores(); - std::map> partition_tablet_map; + std::map partitions; { - // When drop tablet, '_partition_tablet_map_lock' is locked in 'tablet_shard_lock'. - // To avoid locking 'tablet_shard_lock' in '_partition_tablet_map_lock', we lock and - // copy _partition_tablet_map here. - std::shared_lock rdlock(_partition_tablet_map_lock); - partition_tablet_map = _partition_tablet_map; - } - std::map>::iterator partition_iter = partition_tablet_map.begin(); - for (; partition_iter != partition_tablet_map.end(); ++partition_iter) { + // When drop tablet, '_partitions_lock' is locked in 'tablet_shard_lock'. + // To avoid locking 'tablet_shard_lock' in '_partitions_lock', we lock and + // copy _partitions here. + std::shared_lock rdlock(_partitions_lock); + partitions = _partitions; + } + for (const auto& [partition_id, partition] : partitions) { std::map tablets_num; std::map> tablets_info; for (int i = 0; i < data_dirs.size(); i++) { tablets_num[data_dirs[i]] = 0; } - int64_t partition_id = partition_iter->first; - std::set::iterator tablet_info_iter = (partition_iter->second).begin(); - for (; tablet_info_iter != (partition_iter->second).end(); ++tablet_info_iter) { + for (const auto& tablet_info : partition.tablets) { // get_tablet() will hold 'tablet_shard_lock' - TabletSharedPtr tablet = get_tablet(tablet_info_iter->tablet_id); + TabletSharedPtr tablet = get_tablet(tablet_info.tablet_id); if (tablet == nullptr) { continue; } DataDir* data_dir = tablet->data_dir(); size_t tablet_footprint = tablet->tablet_footprint(); tablets_num[data_dir]++; - TabletSize tablet_size(tablet_info_iter->tablet_id, tablet_footprint); + TabletSize tablet_size(tablet_info.tablet_id, tablet_footprint); tablets_info[data_dir].push_back(tablet_size); } tablets_num_on_disk[partition_id] = tablets_num; diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 39043f2d9b61c9c..9161131878e15ca 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -144,6 +144,10 @@ class TabletManager { void get_partition_related_tablets(int64_t partition_id, std::set* tablet_infos); + void get_partitions_visible_version(std::map* partitions_version); + + void update_partitions_visible_version(const std::map& partitions_version); + void do_tablet_meta_checkpoint(DataDir* data_dir); void obtain_specific_quantity_tablets(std::vector& tablets_info, int64_t num); @@ -225,6 +229,11 @@ class TabletManager { std::set tablets_under_clone; }; + struct Partition { + std::set tablets; + std::shared_ptr visible_version {new VersionWithTime}; + }; + // trace the memory use by meta of tablet std::shared_ptr _mem_tracker; std::shared_ptr _tablet_meta_mem_tracker; @@ -233,12 +242,13 @@ class TabletManager { const int32_t _tablets_shards_mask; std::vector _tablets_shards; - // Protect _partition_tablet_map, should not be obtained before _tablet_map_lock to avoid dead lock - std::shared_mutex _partition_tablet_map_lock; + // Protect _partitions, should not be obtained before _tablet_map_lock to avoid dead lock + std::shared_mutex _partitions_lock; + // partition_id => partition + std::map _partitions; + // Protect _shutdown_tablets, should not be obtained before _tablet_map_lock to avoid dead lock std::shared_mutex _shutdown_tablets_lock; - // partition_id => tablet_info - std::map> _partition_tablet_map; // the delete tablets. notice only allow function `start_trash_sweep` can erase tablets in _shutdown_tablets std::list _shutdown_tablets; std::mutex _gc_tablets_lock; diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 237a1259cbf0c2c..b9c3cd9e8bf6e6f 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -701,6 +701,16 @@ Version TabletMeta::max_version() const { return max_version; } +size_t TabletMeta::version_count_cross_with_range(const Version& range) const { + size_t count = 0; + for (const auto& rs_meta : _rs_metas) { + if (!(range.first > rs_meta->version().second || range.second < rs_meta->version().first)) { + count++; + } + } + return count; +} + Status TabletMeta::add_rs_meta(const RowsetMetaSharedPtr& rs_meta) { // check RowsetMeta is valid for (auto& rs : _rs_metas) { diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 60cc485882aee7b..5527b9123795388 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -157,6 +157,7 @@ class TabletMeta { // Remote disk space occupied by tablet. size_t tablet_remote_size() const; size_t version_count() const; + size_t version_count_cross_with_range(const Version& range) const; Version max_version() const; TabletState tablet_state() const; diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 997a5228c845b6e..97f5a3ba00e79cd 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -154,6 +154,8 @@ Status EngineCloneTask::execute() { } Status st = _do_clone(); StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id); + StorageEngine::instance()->tablet_manager()->update_partitions_visible_version( + {{_clone_req.partition_id, _clone_req.version}}); return st; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index 7ab2f8732d0c60b..c7efbc69afc67ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -241,8 +241,7 @@ public void handleFinishAlterTask(AlterReplicaTask task) throws MetaNotFoundExce task.getSignature(), replica, task.getVersion()); boolean versionChanged = false; if (replica.getVersion() < task.getVersion()) { - replica.updateVersionInfo(task.getVersion(), replica.getDataSize(), replica.getRemoteDataSize(), - replica.getRowCount()); + replica.updateVersion(task.getVersion()); versionChanged = true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index df5d96deade0041..6ec8758fb820b56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1717,8 +1717,7 @@ private Status allTabletCommitted(boolean isReplay) { for (Tablet tablet : idx.getTablets()) { for (Replica replica : tablet.getReplicas()) { if (!replica.checkVersionCatchUp(part.getVisibleVersion(), false)) { - replica.updateVersionInfo(part.getVisibleVersion(), replica.getDataSize(), - replica.getRemoteDataSize(), replica.getRowCount()); + replica.updateVersion(part.getVisibleVersion()); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java index 696c54fa2ba140b..f488353bf60ca2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java @@ -107,7 +107,7 @@ private static List> getTabletStatus(String dbName, String tblName, row.add(String.valueOf(replica.getLastSuccessVersion())); row.add(String.valueOf(visibleVersion)); row.add(String.valueOf(replica.getSchemaHash())); - row.add(String.valueOf(replica.getVersionCount())); + row.add(String.valueOf(replica.getTotalVersionCount())); row.add(String.valueOf(replica.isBad())); row.add(replica.getState().name()); row.add(status.name()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index 946083082089502..9505a7e961c412d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -21,6 +21,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.DebugPointUtil; +import org.apache.doris.thrift.TTabletInfo; import org.apache.doris.thrift.TUniqueId; import com.google.gson.annotations.SerializedName; @@ -107,7 +108,8 @@ public enum ReplicaStatus { @SerializedName(value = "lastSuccessVersionHash") private long lastSuccessVersionHash = 0L; - private volatile long versionCount = -1; + private volatile long totalVersionCount = -1; + private volatile long visibleVersionCount = -1; private long pathHash = -1; @@ -229,14 +231,26 @@ public long getDataSize() { return dataSize; } + public void setDataSize(long dataSize) { + this.dataSize = dataSize; + } + public long getRemoteDataSize() { return remoteDataSize; } + public void setRemoteDataSize(long remoteDataSize) { + this.remoteDataSize = remoteDataSize; + } + public long getRowCount() { return rowCount; } + public void setRowCount(long rowCount) { + this.rowCount = rowCount; + } + public long getLastFailedVersion() { return lastFailedVersion; } @@ -317,28 +331,24 @@ public void setFurtherRepairWatermarkTxnTd(long furtherRepairWatermarkTxnTd) { this.furtherRepairWatermarkTxnTd = furtherRepairWatermarkTxnTd; } - // for compatibility - public synchronized void updateStat(long dataSize, long rowNum) { - this.dataSize = dataSize; - this.rowCount = rowNum; + public void updateWithReport(TTabletInfo backendReplica) { + updateVersion(backendReplica.getVersion()); + setDataSize(backendReplica.getDataSize()); + setRemoteDataSize(backendReplica.getRemoteDataSize()); + setRowCount(backendReplica.getRowCount()); + setTotalVersionCount(backendReplica.getTotalVersionCount()); + setVisibleVersionCount( + backendReplica.isSetVisibleVersionCount() ? backendReplica.getVisibleVersionCount() + : backendReplica.getTotalVersionCount()); } - public synchronized void updateStat(long dataSize, long remoteDataSize, long rowNum, long versionCount) { - this.dataSize = dataSize; - this.remoteDataSize = remoteDataSize; - this.rowCount = rowNum; - this.versionCount = versionCount; + public synchronized void updateVersion(long newVersion) { + updateReplicaVersion(newVersion, this.lastFailedVersion, this.lastSuccessVersion); } - public synchronized void updateVersionInfo(long newVersion, long newDataSize, long newRemoteDataSize, - long newRowCount) { - updateReplicaInfo(newVersion, this.lastFailedVersion, this.lastSuccessVersion, newDataSize, newRemoteDataSize, - newRowCount); - } - - public synchronized void updateVersionWithFailedInfo( + public synchronized void updateVersionWithFailed( long newVersion, long lastFailedVersion, long lastSuccessVersion) { - updateReplicaInfo(newVersion, lastFailedVersion, lastSuccessVersion, dataSize, remoteDataSize, rowCount); + updateReplicaVersion(newVersion, lastFailedVersion, lastSuccessVersion); } public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVersion, Long lastSuccessVersion, @@ -401,9 +411,7 @@ public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVer * the V(hash) equals to LSV(hash), and V equals to LFV, but LFV hash is 0 or some unknown number. * We just reset the LFV(hash) to recovery this replica. */ - private void updateReplicaInfo(long newVersion, - long lastFailedVersion, long lastSuccessVersion, - long newDataSize, long newRemoteDataSize, long newRowCount) { + private void updateReplicaVersion(long newVersion, long lastFailedVersion, long lastSuccessVersion) { if (LOG.isDebugEnabled()) { LOG.debug("before update: {}", this.toString()); } @@ -428,9 +436,6 @@ private void updateReplicaInfo(long newVersion, long oldLastFailedVersion = this.lastFailedVersion; this.version = newVersion; - this.dataSize = newDataSize; - this.remoteDataSize = newRemoteDataSize; - this.rowCount = newRowCount; // just check it if (lastSuccessVersion <= this.version) { @@ -491,7 +496,7 @@ private void updateReplicaInfo(long newVersion, } public synchronized void updateLastFailedVersion(long lastFailedVersion) { - updateReplicaInfo(this.version, lastFailedVersion, this.lastSuccessVersion, dataSize, remoteDataSize, rowCount); + updateReplicaVersion(this.version, lastFailedVersion, this.lastSuccessVersion); } /* @@ -534,16 +539,28 @@ public boolean tooSlow() { return state == ReplicaState.COMPACTION_TOO_SLOW; } + public boolean tooBigVersionCount() { + return visibleVersionCount >= Config.min_version_count_indicate_replica_compaction_too_slow; + } + public boolean isNormal() { return state == ReplicaState.NORMAL; } - public long getVersionCount() { - return versionCount; + public long getTotalVersionCount() { + return totalVersionCount; + } + + public void setTotalVersionCount(long totalVersionCount) { + this.totalVersionCount = totalVersionCount; + } + + public long getVisibleVersionCount() { + return visibleVersionCount; } - public void setVersionCount(long versionCount) { - this.versionCount = versionCount; + public void setVisibleVersionCount(long visibleVersionCount) { + this.visibleVersionCount = visibleVersionCount; } public boolean checkVersionRegressive(long newVersion) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index 851932b721bf0d1..18c4a9f04ba847f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -268,16 +268,16 @@ public List getQueryableReplicas(long visibleVersion, boolean allowFail } if (Config.skip_compaction_slower_replica && allQueryableReplica.size() > 1) { - long minVersionCount = Long.MAX_VALUE; - for (Replica replica : allQueryableReplica) { - if (replica.getVersionCount() != -1 && replica.getVersionCount() < minVersionCount) { - minVersionCount = replica.getVersionCount(); - } + long minVersionCount = allQueryableReplica.stream().mapToLong(Replica::getVisibleVersionCount) + .filter(count -> count != -1).min().orElse(Long.MAX_VALUE); + long maxVersionCount = Config.min_version_count_indicate_replica_compaction_too_slow; + if (minVersionCount != Long.MAX_VALUE) { + maxVersionCount = Math.max(maxVersionCount, minVersionCount * QUERYABLE_TIMES_OF_MIN_VERSION_COUNT); } - final long finalMinVersionCount = minVersionCount; - return allQueryableReplica.stream().filter(replica -> replica.getVersionCount() == -1 - || replica.getVersionCount() < Config.min_version_count_indicate_replica_compaction_too_slow - || replica.getVersionCount() < finalMinVersionCount * QUERYABLE_TIMES_OF_MIN_VERSION_COUNT) + + final long finalMaxVersionCount = maxVersionCount; + return allQueryableReplica.stream() + .filter(replica -> replica.getVisibleVersionCount() < finalMaxVersionCount) .collect(Collectors.toList()); } return allQueryableReplica; @@ -515,7 +515,7 @@ public Pair getHealthStatusWithPriority(S if (versionCompleted) { stable++; - versions.add(replica.getVersionCount()); + versions.add(replica.getVisibleVersionCount()); allocNum = stableVersionCompleteAllocMap.getOrDefault(backend.getLocationTag(), (short) 0); stableVersionCompleteAllocMap.put(backend.getLocationTag(), (short) (allocNum + 1)); @@ -606,7 +606,7 @@ public Pair getHealthStatusWithPriority(S // get the max version diff long delta = versions.get(versions.size() - 1) - versions.get(0); double ratio = (double) delta / versions.get(versions.size() - 1); - if (versions.get(versions.size() - 1) > Config.min_version_count_indicate_replica_compaction_too_slow + if (versions.get(versions.size() - 1) >= Config.min_version_count_indicate_replica_compaction_too_slow && ratio > Config.valid_version_count_delta_ratio_between_replicas) { return Pair.of(TabletStatus.REPLICA_COMPACTION_TOO_SLOW, Priority.HIGH); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 0ac27c68c0d5cd1..413c16f96a0eeee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -124,11 +124,13 @@ private void writeUnlock(long stamp) { } public void tabletReport(long backendId, Map backendTablets, + Map backendPartitionsVersion, final HashMap storageMediumMap, ListMultimap tabletSyncMap, ListMultimap tabletDeleteFromMeta, Set tabletFoundInMeta, ListMultimap tabletMigrationMap, + Map partitionVersionSyncMap, Map> transactionsToPublish, ListMultimap transactionsToClear, ListMultimap tabletRecoveryMap, @@ -136,12 +138,14 @@ public void tabletReport(long backendId, Map backendTablets, List cooldownConfToPush, List cooldownConfToUpdate) { List> cooldownTablets = new ArrayList<>(); + long feTabletNum = 0; long stamp = readLock(); long start = System.currentTimeMillis(); try { - LOG.debug("begin to do tablet diff with backend[{}]. num: {}", backendId, backendTablets.size()); + LOG.debug("begin to do tablet diff with backend[{}]", backendId); Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); if (replicaMetaWithBackend != null) { + feTabletNum = replicaMetaWithBackend.size(); taskPool.submit(() -> { // traverse replicas in meta with this backend replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> { @@ -316,8 +320,11 @@ && isLocal(tabletMeta.getStorageMedium())) { // update replicase's version count // no need to write log, and no need to get db lock. - if (backendTabletInfo.isSetVersionCount()) { - replica.setVersionCount(backendTabletInfo.getVersionCount()); + if (backendTabletInfo.isSetTotalVersionCount()) { + replica.setTotalVersionCount(backendTabletInfo.getTotalVersionCount()); + replica.setVisibleVersionCount(backendTabletInfo.isSetVisibleVersionCount() + ? backendTabletInfo.getVisibleVersionCount() + : backendTabletInfo.getTotalVersionCount()); } if (tabletMetaInfo != null) { tabletMetaInfo.setTabletId(tabletId); @@ -334,6 +341,15 @@ && isLocal(tabletMeta.getStorageMedium())) { } } }); + + backendPartitionsVersion.entrySet().parallelStream().forEach(entry -> { + long partitionId = entry.getKey(); + long backendVersion = entry.getValue(); + PartitionCollectInfo partitionInfo = partitionCollectInfoMap.get(partitionId); + if (partitionInfo != null && partitionInfo.getVisibleVersion() > backendVersion) { + partitionVersionSyncMap.put(partitionId, partitionInfo.getVisibleVersion()); + } + }); }).join(); } } finally { @@ -342,11 +358,13 @@ && isLocal(tabletMeta.getStorageMedium())) { cooldownTablets.forEach(p -> handleCooldownConf(p.first, p.second, cooldownConfToPush, cooldownConfToUpdate)); long end = System.currentTimeMillis(); - LOG.info("finished to do tablet diff with backend[{}]. sync: {}." - + " metaDel: {}. foundInMeta: {}. migration: {}. " - + "found invalid transactions {}. found republish transactions {}. tabletToUpdate: {}." - + " need recovery: {}. cost: {} ms", backendId, tabletSyncMap.size(), + LOG.info("finished to do tablet diff with backend[{}]. fe tablet num: {}, backend tablet num: {}. sync: {}." + + " metaDel: {}. foundInMeta: {}. migration: {}. backend partition num: {}, backend need " + + "update: {}. found invalid transactions {}. found republish " + + "transactions {}. tabletToUpdate: {}. need recovery: {}. cost: {} ms", + backendId, feTabletNum, backendTablets.size(), tabletSyncMap.size(), tabletDeleteFromMeta.size(), tabletFoundInMeta.size(), tabletMigrationMap.size(), + backendPartitionsVersion.size(), partitionVersionSyncMap.size(), transactionsToClear.size(), transactionsToPublish.size(), tabletToUpdate.size(), tabletRecoveryMap.size(), (end - start)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index 202f0f3246efbc7..6c37bbaec4bd05a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -140,8 +140,12 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { if (invertedIndex.getTabletMeta(stat.getTabletId()) != null) { Replica replica = invertedIndex.getReplica(stat.getTabletId(), beId); if (replica != null) { - replica.updateStat(stat.getDataSize(), stat.getRemoteDataSize(), stat.getRowNum(), - stat.getVersionCount()); + replica.setDataSize(stat.getDataSize()); + replica.setRemoteDataSize(stat.getRemoteDataSize()); + replica.setRowCount(stat.getRowCount()); + replica.setTotalVersionCount(stat.getTotalVersionCount()); + replica.setVisibleVersionCount(stat.isSetVisibleVersionCount() ? stat.getVisibleVersionCount() + : stat.getTotalVersionCount()); } } } @@ -157,7 +161,8 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { continue; } // TODO(cmy) no db lock protected. I think it is ok even we get wrong row num - replica.updateStat(entry.getValue().getDataSize(), entry.getValue().getRowNum()); + replica.setDataSize(entry.getValue().getDataSize()); + replica.setRowCount(entry.getValue().getRowCount()); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index e37ba3315d596f5..292f12f74f414d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -565,15 +565,15 @@ public boolean compactionRecovered() { Replica chosenReplica = null; long maxVersionCount = Integer.MIN_VALUE; for (Replica replica : tablet.getReplicas()) { - if (replica.getVersionCount() > maxVersionCount) { - maxVersionCount = replica.getVersionCount(); + if (replica.getVisibleVersionCount() > maxVersionCount) { + maxVersionCount = replica.getVisibleVersionCount(); chosenReplica = replica; } } boolean recovered = false; for (Replica replica : tablet.getReplicas()) { if (replica.isAlive() && replica.tooSlow() && (!replica.equals(chosenReplica) - || replica.getVersionCount() < Config.min_version_count_indicate_replica_compaction_too_slow)) { + || !replica.tooBigVersionCount())) { if (chosenReplica != null) { chosenReplica.setState(ReplicaState.NORMAL); recovered = true; @@ -1139,8 +1139,7 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) "replica does not exist. backend id: " + destBackendId); } - replica.updateVersionInfo(reportedTablet.getVersion(), reportedTablet.getDataSize(), - reportedTablet.getDataSize(), reportedTablet.getRowCount()); + replica.updateWithReport(reportedTablet); if (replica.getLastFailedVersion() > partition.getCommittedVersion() && reportedTablet.getVersion() >= partition.getCommittedVersion() //&& !(reportedTablet.isSetVersionMiss() && reportedTablet.isVersionMiss() @@ -1327,8 +1326,8 @@ public String toString() { public static class VersionCountComparator implements Comparator { @Override public int compare(Replica r1, Replica r2) { - long verCount1 = r1.getVersionCount() == -1 ? Long.MAX_VALUE : r1.getVersionCount(); - long verCount2 = r2.getVersionCount() == -1 ? Long.MAX_VALUE : r2.getVersionCount(); + long verCount1 = r1.getVisibleVersionCount() == -1 ? Long.MAX_VALUE : r1.getVisibleVersionCount(); + long verCount2 = r2.getVisibleVersionCount() == -1 ? Long.MAX_VALUE : r2.getVisibleVersionCount(); if (verCount1 < verCount2) { return -1; } else if (verCount1 > verCount2) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 59c62f719064efb..6eb61a31bf8b89a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -1072,13 +1072,13 @@ private void handleReplicaTooSlow(TabletSchedCtx tabletCtx) throws SchedExceptio if (replica.isAlive() && !replica.tooSlow()) { normalReplicaCount++; } - if (replica.getVersionCount() > maxVersionCount) { - maxVersionCount = replica.getVersionCount(); + if (replica.getVisibleVersionCount() > maxVersionCount) { + maxVersionCount = replica.getVisibleVersionCount(); chosenReplica = replica; } } if (chosenReplica != null && chosenReplica.isAlive() && !chosenReplica.tooSlow() - && chosenReplica.getVersionCount() > Config.min_version_count_indicate_replica_compaction_too_slow + && chosenReplica.tooBigVersionCount() && normalReplicaCount - 1 >= tabletCtx.getReplicas().size() / 2 + 1) { chosenReplica.setState(ReplicaState.COMPACTION_TOO_SLOW); LOG.info("set replica id :{} tablet id: {}, backend id: {} to COMPACTION_TOO_SLOW", diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java index 383ba14ab055692..b4beb0414c8736a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java @@ -42,8 +42,9 @@ public class ReplicasProcNode implements ProcNodeInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder().add("ReplicaId") .add("BackendId").add("Version").add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime") .add("SchemaHash").add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State").add("IsBad") - .add("VersionCount").add("PathHash").add("MetaUrl").add("CompactionStatus").add("CooldownReplicaId") - .add("CooldownMetaId").add("QueryHits").build(); + .add("VisibleVersionCount").add("TotalVersionCount").add("PathHash").add("MetaUrl") + .add("CompactionStatus").add("CooldownReplicaId").add("CooldownMetaId").add("QueryHits") + .build(); private long tabletId; private List replicas; @@ -103,7 +104,8 @@ public ProcResult fetchResult() { String.valueOf(replica.getRowCount()), String.valueOf(replica.getState()), String.valueOf(replica.isBad()), - String.valueOf(replica.getVersionCount()), + String.valueOf(replica.getVisibleVersionCount()), + String.valueOf(replica.getTotalVersionCount()), String.valueOf(replica.getPathHash()), metaUrl, compactionUrl, diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java index 3ce3ff74c7adafa..ce88a52082e8306 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java @@ -256,8 +256,7 @@ static class DBTabletStatistic { oversizeTabletIds.add(tablet.getId()); } for (Replica replica : tablet.getReplicas()) { - if (replica.getVersionCount() - > Config.min_version_count_indicate_replica_compaction_too_slow) { + if (replica.tooBigVersionCount()) { replicaCompactionTooSlowNum++; replicaCompactionTooSlowTabletIds.add(tablet.getId()); break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java index e13d1846a5aab13..9866a3276529067 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java @@ -52,8 +52,9 @@ public class TabletsProcDir implements ProcDirInterface { .add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime") .add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State") .add("LstConsistencyCheckTime").add("CheckVersion") - .add("VersionCount").add("QueryHits").add("PathHash").add("MetaUrl").add("CompactionStatus") - .add("CooldownReplicaId").add("CooldownMetaId").build(); + .add("VisibleVersionCount").add("TotalVersionCount").add("QueryHits").add("PathHash") + .add("MetaUrl").add("CompactionStatus").add("CooldownReplicaId").add("CooldownMetaId") + .build(); private Table table; private MaterializedIndex index; @@ -105,7 +106,8 @@ public List> fetchComparableResult(long version, long backendId tabletInfo.add(-1); // lst consistency check time tabletInfo.add(-1); // check version tabletInfo.add(-1); // check version hash - tabletInfo.add(-1); // version count + tabletInfo.add(-1); // visible version count + tabletInfo.add(-1); // total version count tabletInfo.add(0L); // query hits tabletInfo.add(-1); // path hash tabletInfo.add(FeConstants.null_string); // meta url @@ -138,7 +140,8 @@ public List> fetchComparableResult(long version, long backendId tabletInfo.add(TimeUtils.longToTimeString(tablet.getLastCheckTime())); tabletInfo.add(tablet.getCheckedVersion()); - tabletInfo.add(replica.getVersionCount()); + tabletInfo.add(replica.getVisibleVersionCount()); + tabletInfo.add(replica.getTotalVersionCount()); tabletInfo.add(replicaIdToQueryHits.getOrDefault(replica.getId(), 0L)); tabletInfo.add(replica.getPathHash()); Backend be = backendMap.get(replica.getBackendId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 3a080d2b28d1e88..024983a22944722 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -994,7 +994,10 @@ private void unprotectUpdateReplica(OlapTable olapTable, ReplicaPersistInfo info Tablet tablet = materializedIndex.getTablet(info.getTabletId()); Replica replica = tablet.getReplicaByBackendId(info.getBackendId()); Preconditions.checkNotNull(replica, info); - replica.updateVersionInfo(info.getVersion(), info.getDataSize(), info.getRemoteDataSize(), info.getRowCount()); + replica.updateVersion(info.getVersion()); + replica.setDataSize(info.getDataSize()); + replica.setRemoteDataSize(info.getRemoteDataSize()); + replica.setRowCount(info.getRowCount()); replica.setBad(false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 71064585a310b43..fc003ad324a56d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -71,6 +71,7 @@ import org.apache.doris.task.PushStoragePolicyTask; import org.apache.doris.task.StorageMediaMigrationTask; import org.apache.doris.task.UpdateTabletMetaInfoTask; +import org.apache.doris.task.UpdateVisibleVersionTask; import org.apache.doris.thrift.TBackend; import org.apache.doris.thrift.TDisk; import org.apache.doris.thrift.TMasterResult; @@ -154,6 +155,7 @@ public TMasterResult handleReport(TReportRequest request) throws TException { Map> tasks = null; Map disks = null; Map tablets = null; + Map partitionsVersion = null; long reportVersion = -1; ReportType reportType = ReportType.UNKNOWN; @@ -179,11 +181,15 @@ public TMasterResult handleReport(TReportRequest request) throws TException { reportType = ReportType.TABLET; } + if (request.isSetPartitionsVersion()) { + partitionsVersion = request.getPartitionsVersion(); + } + if (request.isSetTabletMaxCompactionScore()) { backend.setTabletMaxCompactionScore(request.getTabletMaxCompactionScore()); } - ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, reportVersion, + ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, partitionsVersion, reportVersion, request.getStoragePolicy(), request.getResource(), request.getNumCores(), request.getPipelineExecutorSize()); try { @@ -230,6 +236,7 @@ private class ReportTask extends MasterTask { private Map> tasks; private Map disks; private Map tablets; + private Map partitionsVersion; private long reportVersion; private List storagePolicies; @@ -238,14 +245,15 @@ private class ReportTask extends MasterTask { private int pipelineExecutorSize; public ReportTask(long beId, Map> tasks, - Map disks, - Map tablets, long reportVersion, + Map disks, Map tablets, + Map partitionsVersion, long reportVersion, List storagePolicies, List storageResources, int cpuCores, int pipelineExecutorSize) { this.beId = beId; this.tasks = tasks; this.disks = disks; this.tablets = tablets; + this.partitionsVersion = partitionsVersion; this.reportVersion = reportVersion; this.storagePolicies = storagePolicies; this.storageResources = storageResources; @@ -272,7 +280,11 @@ protected void exec() { LOG.warn("out of date report version {} from backend[{}]. current report version[{}]", reportVersion, beId, backendReportVersion); } else { - ReportHandler.tabletReport(beId, tablets, reportVersion); + Map partitions = this.partitionsVersion; + if (partitions == null) { + partitions = Maps.newHashMap(); + } + ReportHandler.tabletReport(beId, tablets, partitions, reportVersion); } } } @@ -407,7 +419,8 @@ private static void diffResource(List storageResourcesInBe, Li } // public for fe ut - public static void tabletReport(long backendId, Map backendTablets, long backendReportVersion) { + public static void tabletReport(long backendId, Map backendTablets, + Map backendPartitionsVersion, long backendReportVersion) { long start = System.currentTimeMillis(); LOG.info("backend[{}] reports {} tablet(s). report version: {}", backendId, backendTablets.size(), backendReportVersion); @@ -425,6 +438,9 @@ public static void tabletReport(long backendId, Map backendTablet // storage medium -> tablet id ListMultimap tabletMigrationMap = LinkedListMultimap.create(); + // partition id -> visible version + Map partitionVersionSyncMap = Maps.newConcurrentMap(); + // dbid -> txn id -> [partition info] Map> transactionsToPublish = Maps.newHashMap(); ListMultimap transactionsToClear = LinkedListMultimap.create(); @@ -438,11 +454,13 @@ public static void tabletReport(long backendId, Map backendTablet List cooldownConfToUpdate = new LinkedList<>(); // 1. do the diff. find out (intersection) / (be - meta) / (meta - be) - Env.getCurrentInvertedIndex().tabletReport(backendId, backendTablets, storageMediumMap, + Env.getCurrentInvertedIndex().tabletReport(backendId, backendTablets, backendPartitionsVersion, + storageMediumMap, tabletSyncMap, tabletDeleteFromMeta, tabletFoundInMeta, tabletMigrationMap, + partitionVersionSyncMap, transactionsToPublish, transactionsToClear, tabletRecoveryMap, @@ -498,6 +516,9 @@ public static void tabletReport(long backendId, Map backendTablet if (!cooldownConfToUpdate.isEmpty()) { Env.getCurrentEnv().getCooldownConfHandler().addCooldownConfToUpdate(cooldownConfToUpdate); } + if (!partitionVersionSyncMap.isEmpty()) { + handleUpdatePartitionVersion(partitionVersionSyncMap, backendId); + } final SystemInfoService currentSystemInfo = Env.getCurrentSystemInfo(); Backend reportBackend = currentSystemInfo.getBackend(backendId); @@ -649,27 +670,17 @@ private static void sync(Map backendTablets, ListMultimap t.getSchemaHash() == schemaHash).findFirst().orElse(null); + if (tabletInfo == null) { continue; } + long metaVersion = replica.getVersion(); + long backendVersion = tabletInfo.getVersion(); boolean needSync = false; if (metaVersion < backendVersion) { needSync = true; @@ -692,7 +703,7 @@ private static void sync(Map backendTablets, ListMultimap backendTablets, ListMultimap partitionVersionSyncMap, long backendId) { + AgentBatchTask batchTask = new AgentBatchTask(); + UpdateVisibleVersionTask task = new UpdateVisibleVersionTask(backendId, partitionVersionSyncMap, + System.currentTimeMillis()); + batchTask.addTask(task); + AgentTaskExecutor.submit(batchTask); + } + private static void handleRecoverTablet(ListMultimap tabletRecoveryMap, Map backendTablets, long backendId) { // print a warn log here to indicate the exceptions on the backend diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java index fc0cfe08ea596d9..5e6ce5c165f6344 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java @@ -26,7 +26,6 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; -import org.apache.doris.common.Config; import com.google.common.collect.Lists; import org.json.simple.JSONObject; @@ -153,9 +152,9 @@ public static List> diagnoseTablet(long tabletId) { statusErr.append("Replica on backend " + replica.getBackendId() + "'s state is " + replica.getState() + ", and is bad: " + (replica.isBad() ? "Yes" : "No")); } - if (replica.getVersionCount() > Config.min_version_count_indicate_replica_compaction_too_slow) { + if (replica.tooBigVersionCount()) { compactionErr.append("Replica on backend " + replica.getBackendId() + "'s version count is too high: " - + replica.getVersionCount()); + + replica.getVisibleVersionCount()); } } results.add(Lists.newArrayList("ReplicaBackendStatus", (backendErr.length() == 0 diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java index aa654fcd21a2258..d942c68f51b884f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java @@ -47,6 +47,7 @@ import org.apache.doris.thrift.TTaskType; import org.apache.doris.thrift.TUpdateTabletMetaInfoReq; import org.apache.doris.thrift.TUploadReq; +import org.apache.doris.thrift.TVisibleVersionReq; import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; @@ -390,6 +391,15 @@ private TAgentTaskRequest toAgentTaskRequest(AgentTask task) { tAgentTaskRequest.setGcBinlogReq(request); return tAgentTaskRequest; } + case UPDATE_VISIBLE_VERSION: { + UpdateVisibleVersionTask visibleTask = (UpdateVisibleVersionTask) task; + TVisibleVersionReq request = visibleTask.toThrift(); + if (LOG.isDebugEnabled()) { + LOG.debug(request.toString()); + } + tAgentTaskRequest.setVisibleVersionReq(request); + return tAgentTaskRequest; + } default: LOG.debug("could not find task type for task [{}]", task); return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateVisibleVersionTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateVisibleVersionTask.java new file mode 100644 index 000000000000000..52ed3b1c490381b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateVisibleVersionTask.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.thrift.TTaskType; +import org.apache.doris.thrift.TVisibleVersionReq; + +import java.util.Map; + +public class UpdateVisibleVersionTask extends AgentTask { + private Map partitionVisibleVersions; + + public UpdateVisibleVersionTask(long backendId, Map partitionVisibleVersions, long createTime) { + super(null, backendId, TTaskType.UPDATE_VISIBLE_VERSION, -1L, -1L, -1L, -1L, -1L, -1L, createTime); + this.partitionVisibleVersions = partitionVisibleVersions; + } + + public TVisibleVersionReq toThrift() { + TVisibleVersionReq request = new TVisibleVersionReq(); + partitionVisibleVersions.forEach((partitionId, version) -> { + request.putToPartitionVersion(partitionId, version); + }); + return request; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 83b4d164115ac6f..76326906428a6cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -44,6 +44,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.QuotaExceedException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.common.util.TimeUtils; @@ -933,7 +934,12 @@ protected List getCommittedTxnList() { } } - public void finishTransaction(long transactionId) throws UserException { + public void finishTransaction(long transactionId, Map partitionVisibleVersions, + Map> backendPartitions) throws UserException { + if (DebugPointUtil.isEnable("DatabaseTransactionMgr.stop_finish_transaction")) { + return; + } + TransactionState transactionState = null; readLock(); try { @@ -1124,7 +1130,7 @@ public void finishTransaction(long transactionId) throws UserException { LOG.warn("afterStateTransform txn {} failed. exception: ", transactionState, e); } } - updateCatalogAfterVisible(transactionState, db); + updateCatalogAfterVisible(transactionState, db, partitionVisibleVersions, backendPartitions); } finally { MetaLockUtils.writeUnlockTables(tableList); } @@ -1782,7 +1788,8 @@ private void updateCatalogAfterCommitted(TransactionState transactionState, Data } } - private boolean updateCatalogAfterVisible(TransactionState transactionState, Database db) { + private boolean updateCatalogAfterVisible(TransactionState transactionState, Database db, + Map partitionVisibleVersions, Map> backendPartitions) { Set errorReplicaIds = transactionState.getErrorReplicas(); for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { long tableId = tableCommitInfo.getTableId(); @@ -1837,13 +1844,20 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat lastFailedVersion = newCommitVersion; } } - replica.updateVersionWithFailedInfo(newVersion, lastFailedVersion, lastSuccessVersion); + replica.updateVersionWithFailed(newVersion, lastFailedVersion, lastSuccessVersion); + Set partitionIds = backendPartitions.get(replica.getBackendId()); + if (partitionIds == null) { + partitionIds = Sets.newHashSet(); + backendPartitions.put(replica.getBackendId(), partitionIds); + } + partitionIds.add(partitionId); } } } // end for indices long version = partitionCommitInfo.getVersion(); long versionTime = partitionCommitInfo.getVersionTime(); partition.updateVisibleVersionAndTime(version, versionTime); + partitionVisibleVersions.put(partition.getId(), version); if (LOG.isDebugEnabled()) { LOG.debug("transaction state {} set partition {}'s version to [{}]", transactionState, partition.getId(), version); @@ -1985,7 +1999,7 @@ public void replayUpsertTransactionState(TransactionState transactionState) thro if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) { updateCatalogAfterCommitted(transactionState, db); } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - updateCatalogAfterVisible(transactionState, db); + updateCatalogAfterVisible(transactionState, db, Maps.newHashMap(), Maps.newHashMap()); } unprotectUpsertTransactionState(transactionState, true); } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index ee228a10fc867d7..356869b240ade6e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -406,9 +406,10 @@ public boolean existCommittedTxns(Long dbId, Long tableId, Long partitionId) { * @param transactionId * @return */ - public void finishTransaction(long dbId, long transactionId) throws UserException { + public void finishTransaction(long dbId, long transactionId, Map partitionVisibleVersions, + Map> backendPartitions) throws UserException { DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId); - dbTransactionMgr.finishTransaction(transactionId); + dbTransactionMgr.finishTransaction(transactionId, partitionVisibleVersions, backendPartitions); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 250f2c56665b761..6e35abe319d4b9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -27,6 +27,7 @@ import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.PublishVersionTask; +import org.apache.doris.task.UpdateVisibleVersionTask; import org.apache.doris.thrift.TPartitionVersionInfo; import org.apache.doris.thrift.TTaskType; @@ -40,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.Stream; public class PublishVersionDaemon extends MasterDaemon { @@ -52,14 +54,18 @@ public PublishVersionDaemon() { @Override protected void runAfterCatalogReady() { + Map partitionVisibleVersions = Maps.newHashMap(); + Map> backendPartitions = Maps.newHashMap(); + try { - publishVersion(); + publishVersion(partitionVisibleVersions, backendPartitions); + sendBackendVisibleVersion(partitionVisibleVersions, backendPartitions); } catch (Throwable t) { LOG.error("errors while publish version to all backends", t); } } - private void publishVersion() { + private void publishVersion(Map partitionVisibleVersions, Map> backendPartitions) { if (DebugPointUtil.isEnable("PublishVersionDaemon.stop_publish")) { return; } @@ -157,7 +163,7 @@ private void publishVersion() { try { // one transaction exception should not affect other transaction globalTransactionMgr.finishTransaction(transactionState.getDbId(), - transactionState.getTransactionId()); + transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions); } catch (Exception e) { LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e); } @@ -181,4 +187,23 @@ private void publishVersion() { } } // end for readyTransactionStates } + + private void sendBackendVisibleVersion(Map partitionVisibleVersions, + Map> backendPartitions) { + if (partitionVisibleVersions.isEmpty() || backendPartitions.isEmpty()) { + return; + } + + long createTime = System.currentTimeMillis(); + AgentBatchTask batchTask = new AgentBatchTask(); + backendPartitions.forEach((backendId, partitionIds) -> { + Map backendPartitionVersions = partitionVisibleVersions.entrySet().stream() + .filter(entry -> partitionIds.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + UpdateVisibleVersionTask task = new UpdateVisibleVersionTask(backendId, backendPartitionVersions, + createTime); + batchTask.addTask(task); + }); + AgentTaskExecutor.submit(batchTask); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java index 9c60846a4de1ff3..8eb693b470055c4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java @@ -221,10 +221,7 @@ public void testSchemaChange1() throws Exception { MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0); for (Tablet shadowTablet : shadowIndex.getTablets()) { for (Replica shadowReplica : shadowTablet.getReplicas()) { - shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), - shadowReplica.getDataSize(), - shadowReplica.getRemoteDataSize(), - shadowReplica.getRowCount()); + shadowReplica.updateVersion(testPartition.getVisibleVersion()); } } @@ -301,10 +298,7 @@ public void testSchemaChangeWhileTabletNotStable() throws Exception { MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0); for (Tablet shadowTablet : shadowIndex.getTablets()) { for (Replica shadowReplica : shadowTablet.getReplicas()) { - shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), - shadowReplica.getDataSize(), - shadowReplica.getRemoteDataSize(), - shadowReplica.getRowCount()); + shadowReplica.updateVersion(testPartition.getVisibleVersion()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java index 2d3a9aac5aee9a6..002247e8c76bace 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java @@ -213,8 +213,7 @@ public void testSchemaChange1() throws Exception { MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0); for (Tablet shadowTablet : shadowIndex.getTablets()) { for (Replica shadowReplica : shadowTablet.getReplicas()) { - shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), shadowReplica.getDataSize(), - shadowReplica.getRemoteDataSize(), shadowReplica.getRowCount()); + shadowReplica.updateVersion(testPartition.getVisibleVersion()); } } @@ -296,8 +295,7 @@ public void testSchemaChangeWhileTabletNotStable() throws Exception { MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0); for (Tablet shadowTablet : shadowIndex.getTablets()) { for (Replica shadowReplica : shadowTablet.getReplicas()) { - shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), shadowReplica.getDataSize(), - shadowReplica.getRemoteDataSize(), shadowReplica.getRowCount()); + shadowReplica.updateVersion(testPartition.getVisibleVersion()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminShowReplicaTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminShowReplicaTest.java index 2c2caf0c5beb1c7..d9e1b367b0c30fc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminShowReplicaTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminShowReplicaTest.java @@ -73,7 +73,8 @@ public void testShowReplicaDistribution() throws Exception { for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) { for (Tablet tablet : index.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateStat(1024, 2); + replica.setDataSize(1024L); + replica.setRowCount(2L); } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java index d6a81cdd88339fd..eb7dbca0775e545 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java @@ -65,12 +65,7 @@ public void getMethodTest() { // update new version long newVersion = version + 1; - long newDataSize = dataSize + 100; - long newRowCount = rowCount + 10; - replica.updateVersionInfo(newVersion, newDataSize, 0, newRowCount); - Assert.assertEquals(newVersion, replica.getVersion()); - Assert.assertEquals(newDataSize, replica.getDataSize()); - Assert.assertEquals(newRowCount, replica.getRowCount()); + replica.updateVersion(newVersion); // check version catch up Assert.assertFalse(replica.checkVersionCatchUp(5, false)); @@ -132,14 +127,14 @@ public void testSerialization() throws Exception { public void testUpdateVersion1() { Replica originalReplica = new Replica(10000, 20000, 3, 0, 100, 0, 78, ReplicaState.NORMAL, 0, 3); // new version is little than original version, it is invalid the version will not update - originalReplica.updateVersionInfo(2, 100, 0, 78); + originalReplica.updateVersion(2); Assert.assertEquals(3, originalReplica.getVersion()); } @Test public void testUpdateVersion2() { Replica originalReplica = new Replica(10000, 20000, 3, 0, 100, 0, 78, ReplicaState.NORMAL, 0, 0); - originalReplica.updateVersionInfo(3, 100, 0, 78); + originalReplica.updateVersion(3); // if new version >= current version and last success version <= new version, then last success version should be updated Assert.assertEquals(3, originalReplica.getLastSuccessVersion()); Assert.assertEquals(3, originalReplica.getVersion()); @@ -155,7 +150,7 @@ public void testUpdateVersion3() { Assert.assertEquals(8, originalReplica.getLastFailedVersion()); // update last success version 10 - originalReplica.updateVersionWithFailedInfo(originalReplica.getVersion(), + originalReplica.updateVersionWithFailed(originalReplica.getVersion(), originalReplica.getLastFailedVersion(), 10); Assert.assertEquals(10, originalReplica.getLastSuccessVersion()); @@ -163,7 +158,7 @@ public void testUpdateVersion3() { Assert.assertEquals(8, originalReplica.getLastFailedVersion()); // update version to 8, the last success version and version should be 10 - originalReplica.updateVersionInfo(8, 100, 0, 78); + originalReplica.updateVersion(8); Assert.assertEquals(10, originalReplica.getLastSuccessVersion()); Assert.assertEquals(10, originalReplica.getVersion()); Assert.assertEquals(-1, originalReplica.getLastFailedVersion()); @@ -175,7 +170,7 @@ public void testUpdateVersion3() { Assert.assertEquals(12, originalReplica.getLastFailedVersion()); // update last success version to 15 - originalReplica.updateVersionWithFailedInfo(originalReplica.getVersion(), + originalReplica.updateVersionWithFailed(originalReplica.getVersion(), originalReplica.getLastFailedVersion(), 15); Assert.assertEquals(15, originalReplica.getLastSuccessVersion()); @@ -189,13 +184,13 @@ public void testUpdateVersion3() { Assert.assertEquals(18, originalReplica.getLastFailedVersion()); // update version to 17 then version and success version is 17 - originalReplica.updateVersionInfo(17, 100, 0, 78); + originalReplica.updateVersion(17); Assert.assertEquals(17, originalReplica.getLastSuccessVersion()); Assert.assertEquals(17, originalReplica.getVersion()); Assert.assertEquals(18, originalReplica.getLastFailedVersion()); // update version to 18, then version and last success version should be 18 and failed version should be -1 - originalReplica.updateVersionInfo(18, 100, 0, 78); + originalReplica.updateVersion(18); Assert.assertEquals(18, originalReplica.getLastSuccessVersion()); Assert.assertEquals(18, originalReplica.getVersion()); Assert.assertEquals(-1, originalReplica.getLastFailedVersion()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java index 028a07941c91d25..f2f4e65b25e2663 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java @@ -110,7 +110,8 @@ public void testDiskReblanceWhenSchedulerIdle() throws Exception { Lists.newArrayList(tablet.getReplicas()).forEach( replica -> { if (replica.getBackendId() == backends.get(1).getId()) { - replica.updateStat(totalCapacity / 4, 1); + replica.setDataSize(totalCapacity / 4); + replica.setRowCount(1); tablet.deleteReplica(replica); replica.setBackendId(backends.get(0).getId()); replica.setPathHash(diskInfo0.getPathHash()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java index da03d42a644cba3..2883c88d07b7953 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java @@ -103,7 +103,7 @@ public static void createReplicasAndAddToIndex(TabletInvertedIndex invertedIndex replica.setPathHash(beIds.get(i)); if (replicaSizes != null) { // for disk rebalancer, every beId corresponding to a replicaSize - replica.updateStat(replicaSizes.get(i), 0); + replica.setDataSize(replicaSizes.get(i)); } // isRestore set true, to avoid modifying Catalog.getCurrentInvertedIndex tablet.addReplica(replica, true); @@ -164,7 +164,8 @@ public static void updateReplicaDataSize(long minReplicaSize, int tableSkew, in for (Tablet tablet : idx.getTablets()) { long tabletSize = tableBaseSize * (1 + random.nextInt(tabletSkew)); for (Replica replica : tablet.getReplicas()) { - replica.updateStat(tabletSize, 1000L); + replica.setDataSize(tabletSize); + replica.setRowCount(1000L); } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java index 7539548583c5024..e33cfef989dc7ff 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java @@ -106,7 +106,7 @@ public void testRepairLastFailedVersionByReport() throws Exception { tablets.put(tablet.getId(), tTablet); Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); - ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); + ReportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L); Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); Assertions.assertEquals(-1L, replica.getLastFailedVersion()); @@ -136,11 +136,11 @@ public void testVersionRegressive() throws Exception { Map tablets = Maps.newHashMap(); tablets.put(tablet.getId(), tTablet); - ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); + ReportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L); Assertions.assertEquals(-1L, replica.getLastFailedVersion()); DebugPointUtil.addDebugPoint("Replica.regressive_version_immediately", new DebugPoint()); - ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); + ReportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L); Assertions.assertEquals(replica.getVersion() + 1, replica.getLastFailedVersion()); Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); @@ -160,7 +160,7 @@ private TableInfo prepareTableForTest(String tableName) throws Exception { long visibleVersion = 2L; partition.updateVisibleVersion(visibleVersion); partition.setNextVersion(visibleVersion + 1); - tablet.getReplicas().forEach(replica -> replica.updateVersionInfo(visibleVersion, 1L, 1L, 1L)); + tablet.getReplicas().forEach(replica -> replica.updateVersion(visibleVersion)); Replica replica = tablet.getReplicas().iterator().next(); Assertions.assertEquals(visibleVersion, replica.getVersion()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java index 6a38985b73f6e74..7d918ef7db54ae9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java @@ -141,7 +141,7 @@ private static void updateReplicaVersionCount() { List pathHashes = be.getDisks().values().stream() .map(DiskInfo::getPathHash).collect(Collectors.toList()); Replica replica = cell.getValue(); - replica.setVersionCount(versionCount); + replica.setVisibleVersionCount(versionCount); versionCount = versionCount + 200; replica.setPathHash(pathHashes.get(0)); @@ -171,7 +171,7 @@ public void test() throws Exception { boolean found = false; for (Table.Cell cell : replicaMetaTable.cellSet()) { Replica replica = cell.getValue(); - if (replica.getVersionCount() == 401) { + if (replica.getVisibleVersionCount() == 401) { if (replica.tooSlow()) { LOG.info("set to TOO_SLOW."); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java index d4578e17d7fb389..852f072eca1c353 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java @@ -84,19 +84,19 @@ public void testVersionCountComparator() { TabletSchedCtx.VersionCountComparator countComparator = new TabletSchedCtx.VersionCountComparator(); List replicaList = Lists.newArrayList(); Replica replica1 = new Replica(); - replica1.setVersionCount(100); + replica1.setVisibleVersionCount(100); replica1.setState(Replica.ReplicaState.NORMAL); Replica replica2 = new Replica(); - replica2.setVersionCount(50); + replica2.setVisibleVersionCount(50); replica2.setState(Replica.ReplicaState.NORMAL); Replica replica3 = new Replica(); - replica3.setVersionCount(-1); + replica3.setVisibleVersionCount(-1); replica3.setState(Replica.ReplicaState.NORMAL); Replica replica4 = new Replica(); - replica4.setVersionCount(200); + replica4.setVisibleVersionCount(200); replica4.setState(Replica.ReplicaState.NORMAL); replicaList.add(replica1); @@ -105,10 +105,10 @@ public void testVersionCountComparator() { replicaList.add(replica4); Collections.sort(replicaList, countComparator); - Assert.assertEquals(50, replicaList.get(0).getVersionCount()); - Assert.assertEquals(100, replicaList.get(1).getVersionCount()); - Assert.assertEquals(200, replicaList.get(2).getVersionCount()); - Assert.assertEquals(-1, replicaList.get(3).getVersionCount()); + Assert.assertEquals(50, replicaList.get(0).getVisibleVersionCount()); + Assert.assertEquals(100, replicaList.get(1).getVisibleVersionCount()); + Assert.assertEquals(200, replicaList.get(2).getVisibleVersionCount()); + Assert.assertEquals(-1, replicaList.get(3).getVisibleVersionCount()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index 9e72058a8dddd4d..ccac1b4296063e6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -1100,7 +1100,7 @@ public void testBucketShuffleJoin() throws Exception { mIndex.setRowCount(10000); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 200000, 0, 10000); + replica.updateVersion(2); } } } @@ -1114,7 +1114,7 @@ public void testBucketShuffleJoin() throws Exception { mIndex.setRowCount(10000); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 200000, 0, 10000); + replica.updateVersion(2); } } } @@ -1198,7 +1198,7 @@ public void testJoinWithMysqlTable() throws Exception { mIndex.setRowCount(10000); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 200000, 0, 10000); + replica.updateVersion(2); } } } @@ -1228,7 +1228,7 @@ public void testJoinWithMysqlTable() throws Exception { mIndex.setRowCount(0); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 0, 0, 0); + replica.updateVersion(2); } } } @@ -1248,7 +1248,7 @@ public void testJoinWithOdbcTable() throws Exception { mIndex.setRowCount(10000); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 200000, 0, 10000); + replica.updateVersion(2); } } } @@ -1277,7 +1277,7 @@ public void testJoinWithOdbcTable() throws Exception { mIndex.setRowCount(0); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 0, 0, 0); + replica.updateVersion(2); } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java index b988650fa2f44cb..70aec9fdd3933f2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java @@ -42,6 +42,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Map; +import java.util.Set; public class DatabaseTransactionMgrTest { @@ -115,7 +116,10 @@ public Map addTransactionToTransactionMgr() throws UserException { setTransactionFinishPublish(transactionState1, Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3)); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId1); + Map partitionVisibleVersions = Maps.newHashMap(); + Map> backendPartitions = Maps.newHashMap(); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId1, + partitionVisibleVersions, backendPartitions); labelToTxnId.put(CatalogTestUtil.testTxnLabel1, transactionId1); TransactionState.TxnCoordinator beTransactionSource = new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.BE, "be1"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 89f6e0b8145d6d3..eb5d4f6e4266811 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -53,8 +53,10 @@ import org.apache.doris.transaction.TransactionState.TxnCoordinator; import org.apache.doris.transaction.TransactionState.TxnSourceType; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import mockit.Injectable; import mockit.Mocked; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -65,6 +67,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; public class GlobalTransactionMgrTest { @@ -469,7 +472,10 @@ public void testFinishTransaction() throws UserException { CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3)); transactionState.getPublishVersionTasks() .get(CatalogTestUtil.testBackendId1).getErrorTablets().add(CatalogTestUtil.testTabletId1); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId); + Map partitionVisibleVersions = Maps.newHashMap(); + Map> backendPartitions = Maps.newHashMap(); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId, + partitionVisibleVersions, backendPartitions); transactionState = fakeEditLog.getTransaction(transactionId); Assert.assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus()); // check replica version @@ -488,6 +494,14 @@ public void testFinishTransaction() throws UserException { } } + + Assert.assertEquals(ImmutableMap.of(CatalogTestUtil.testPartition1, CatalogTestUtil.testStartVersion + 1), + partitionVisibleVersions); + Set partitionIds = Sets.newHashSet(testPartition.getId()); + Assert.assertEquals(partitionIds, backendPartitions.get(CatalogTestUtil.testBackendId1)); + Assert.assertEquals(partitionIds, backendPartitions.get(CatalogTestUtil.testBackendId2)); + Assert.assertEquals(partitionIds, backendPartitions.get(CatalogTestUtil.testBackendId3)); + // slave replay new state and compare catalog slaveTransMgr.replayUpsertTransactionState(transactionState); Assert.assertTrue(CatalogTestUtil.compareCatalog(masterEnv, slaveEnv)); @@ -530,8 +544,13 @@ public void testFinishTransactionWithOneFailed() throws UserException { // backend2 publish failed transactionState.getPublishVersionTasks() .get(CatalogTestUtil.testBackendId2).getErrorTablets().add(CatalogTestUtil.testTabletId1); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId); + Map partitionVisibleVersions = Maps.newHashMap(); + Map> backendPartitions = Maps.newHashMap(); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId, + partitionVisibleVersions, backendPartitions); Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); + Assert.assertTrue(partitionVisibleVersions.isEmpty()); + Assert.assertTrue(backendPartitions.isEmpty()); Replica replica1 = tablet.getReplicaById(CatalogTestUtil.testReplicaId1); Replica replica2 = tablet.getReplicaById(CatalogTestUtil.testReplicaId2); Replica replica3 = tablet.getReplicaById(CatalogTestUtil.testReplicaId3); @@ -549,7 +568,8 @@ public void testFinishTransactionWithOneFailed() throws UserException { backend2SuccTablets.put(CatalogTestUtil.testTabletId1, 0L); transactionState.getPublishVersionTasks() .get(CatalogTestUtil.testBackendId2).setSuccTablets(backend2SuccTablets); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId, + partitionVisibleVersions, backendPartitions); Assert.assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus()); Assert.assertEquals(CatalogTestUtil.testStartVersion + 1, replica1.getVersion()); Assert.assertEquals(CatalogTestUtil.testStartVersion + 1, replica2.getVersion()); @@ -614,7 +634,8 @@ public void testFinishTransactionWithOneFailed() throws UserException { DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState, Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3)); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId2); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId2, + partitionVisibleVersions, backendPartitions); Assert.assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus()); Assert.assertEquals(CatalogTestUtil.testStartVersion + 2, replica1.getVersion()); Assert.assertEquals(CatalogTestUtil.testStartVersion + 2, replica2.getVersion()); diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 6f777675f3ee0ab..cf3f4c915c6987e 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -379,6 +379,10 @@ struct TPublishVersionRequest { 3: optional bool strict_mode = false } +struct TVisibleVersionReq { + 1: required map partition_version +} + struct TClearAlterTaskRequest { 1: required Types.TTabletId tablet_id 2: required Types.TSchemaHash schema_hash @@ -477,6 +481,7 @@ struct TAgentTaskRequest { 31: optional TPushStoragePolicyReq push_storage_policy_req 32: optional TAlterInvertedIndexReq alter_inverted_index_req 33: optional TGcBinlogReq gc_binlog_req + 34: optional TVisibleVersionReq visible_version_req } struct TAgentResult { diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 1f2e8185a660c78..6cdfdb95553d9aa 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -33,9 +33,10 @@ struct TTabletStat { 1: required i64 tablet_id // local data size 2: optional i64 data_size - 3: optional i64 row_num - 4: optional i64 version_count + 3: optional i64 row_count + 4: optional i64 total_version_count 5: optional i64 remote_data_size + 6: optional i64 visible_version_count } struct TTabletStatResult { diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index 9acd3f85f7b0433..7442a86a9904a11 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -33,7 +33,7 @@ struct TTabletInfo { 6: required Types.TSize data_size 7: optional Types.TStorageMedium storage_medium 8: optional list transaction_ids - 9: optional i64 version_count + 9: optional i64 total_version_count 10: optional i64 path_hash 11: optional bool version_miss 12: optional bool used @@ -46,6 +46,7 @@ struct TTabletInfo { // 18: optional bool is_cooldown 19: optional i64 cooldown_term 20: optional Types.TUniqueId cooldown_meta_id + 21: optional i64 visible_version_count } struct TFinishTaskRequest { @@ -106,6 +107,7 @@ struct TReportRequest { 10: optional list resource // only id and version 11: i32 num_cores 12: i32 pipeline_executor_size + 13: optional map partitions_version } struct TMasterResult { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 4f101f1177e121f..171eb6c9e4c346e 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -222,7 +222,8 @@ enum TTaskType { PUSH_COOLDOWN_CONF, PUSH_STORAGE_POLICY, ALTER_INVERTED_INDEX, - GC_BINLOG + GC_BINLOG, + UPDATE_VISIBLE_VERSION, } enum TStmtType { diff --git a/regression-test/data/compaction/test_compaction_with_visible_version.out b/regression-test/data/compaction/test_compaction_with_visible_version.out new file mode 100644 index 000000000000000..54739ddf6657d52 --- /dev/null +++ b/regression-test/data/compaction/test_compaction_with_visible_version.out @@ -0,0 +1,259 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_1 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 + +-- !select_2 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 + +-- !select_3 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 +21 210 +22 220 +23 230 +24 240 +25 250 +26 260 +27 270 +28 280 +29 290 +30 300 +31 310 +32 320 +33 330 +34 340 +35 350 +36 360 +37 370 +38 380 +39 390 +40 400 +41 410 + +-- !select_4 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 +21 210 +22 220 +23 230 +24 240 +25 250 +26 260 +27 270 +28 280 +29 290 +30 300 +31 310 +32 320 +33 330 +34 340 +35 350 +36 360 +37 370 +38 380 +39 390 +40 400 +41 410 + +-- !select_5 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 +21 210 +22 220 +23 230 +24 240 +25 250 +26 260 +27 270 +28 280 +29 290 +30 300 +31 310 +32 320 +33 330 +34 340 +35 350 +36 360 +37 370 +38 380 +39 390 +40 400 +41 410 +42 420 +43 430 +44 440 +45 450 +46 460 +47 470 +48 480 +49 490 +50 500 +51 510 +52 520 +53 530 +54 540 +55 550 +56 560 +57 570 +58 580 +59 590 +60 600 +61 610 +62 620 +63 630 +64 640 +65 650 +66 660 +67 670 +68 680 +69 690 +70 700 +71 710 +72 720 +73 730 +74 740 +75 750 +76 760 +77 770 +78 780 +79 790 +80 800 +81 810 +82 820 +83 830 +84 840 +85 850 +86 860 +87 870 +88 880 +89 890 +90 900 +91 910 +92 920 +93 930 +94 940 +95 950 +96 960 +97 970 +98 980 +99 990 +100 1000 +101 1010 +102 1020 +103 1030 +104 1040 +105 1050 +106 1060 +107 1070 +108 1080 +109 1090 +110 1100 +111 1110 +112 1120 +113 1130 +114 1140 +115 1150 +116 1160 +117 1170 +118 1180 +119 1190 +120 1200 +121 1210 + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 039913b7248257a..508038b8bf2a117 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -35,7 +35,10 @@ class ClusterOptions { int feNum = 1 int beNum = 3 List feConfigs = [] - List beConfigs = [] + List beConfigs = [ + 'report_disk_state_interval_seconds=2', + 'report_random_wait=false', + ] // each be disks, a disks format is: disk_type=disk_num[,disk_capacity] // here disk_type=HDD or SSD, disk capacity is in gb unit. diff --git a/regression-test/suites/compaction/test_compaction_with_visible_version.groovy b/regression-test/suites/compaction/test_compaction_with_visible_version.groovy new file mode 100644 index 000000000000000..2fea0d5b371db4c --- /dev/null +++ b/regression-test/suites/compaction/test_compaction_with_visible_version.groovy @@ -0,0 +1,259 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.Http +import org.apache.doris.regression.util.NodeType + +suite('test_compaction_with_visible_version') { + def options = new ClusterOptions() + def compaction_keep_invisible_version_min_count = 50L + options.feConfigs += [ + 'partition_info_update_interval_secs=5', + ] + options.beConfigs += [ + 'disable_auto_compaction=true', + 'report_tablet_interval_seconds=1', + 'tablet_rowset_stale_sweep_by_size=true', + 'tablet_rowset_stale_sweep_threshold_size=0', + 'compaction_keep_invisible_version_timeout_sec=6000', + "compaction_keep_invisible_version_min_count=${compaction_keep_invisible_version_min_count}".toString(), + 'compaction_keep_invisible_version_max_count=500', + ] + options.enableDebugPoints() + + docker(options) { + def E_CUMULATIVE_NO_SUITABLE_VERSION = 'E-2000' + def E_FULL_MISS_VERSION = 'E-2009' + + sql 'SET GLOBAL insert_visible_timeout_ms = 3000' + + def tableName = 'test_compaction_with_visible_version' + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort) + + def triggerCompaction = { tablet, isCompactSucc, compaction_type -> + def tabletId = tablet.TabletId + def backendId = tablet.BackendId + def backendIp = backendId_to_backendIP.get(backendId) + def backendHttpPort = backendId_to_backendHttpPort.get(backendId) + def code + def out + def err + if (compaction_type == 'base') { + (code, out, err) = be_run_base_compaction(backendIp, backendHttpPort, tabletId) + } else { + (code, out, err) = be_run_cumulative_compaction(backendIp, backendHttpPort, tabletId) + } + logger.info("Run compaction: code=${code}, out=${out}, err=${err}") + assertEquals(0, code) + def compactJson = parseJson(out.trim()) + if (isCompactSucc) { + assertEquals('success', compactJson.status.toLowerCase()) + } else { + if (compaction_type == 'base') { + assertEquals(E_FULL_MISS_VERSION, compactJson.status) + } else { + assertEquals(E_CUMULATIVE_NO_SUITABLE_VERSION, compactJson.status) + } + } + } + + def waitCompaction = { tablet, startTs -> + def tabletId = tablet.TabletId + def backendId = tablet.BackendId + def backendIp = backendId_to_backendIP.get(backendId) + def backendHttpPort = backendId_to_backendHttpPort.get(backendId) + def running = true + while (running) { + assertTrue(System.currentTimeMillis() - startTs < 60 * 1000) + Thread.sleep(1000) + def (code, out, err) = be_get_compaction_status(backendIp, backendHttpPort, tabletId) + logger.info("Get compaction: code=${code}, out=${out}, err=${err}") + assertEquals(0, code) + + def compactionStatus = parseJson(out.trim()) + assertEquals('success', compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + } + + def checkCompact = { isCumuCompactSucc, runBaseCompact, isInvisibleTimeout, version, visibleVersion -> + def partition = sql_return_maparray("SHOW PARTITIONS FROM ${tableName}")[0] + assertEquals(visibleVersion, partition.VisibleVersion as long) + + // wait be report version count + Thread.sleep(3 * 1000) + def tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}" + def lastVisibleVersionCountMap = [:] + tablets.each { + lastVisibleVersionCountMap[it.BackendId] = it.VisibleVersionCount as long + triggerCompaction it, isCumuCompactSucc, 'cumulative' + } + + if (isCumuCompactSucc) { + // wait compaction done + def startTs = System.currentTimeMillis() + tablets.each { + waitCompaction it, startTs + } + } + + if (runBaseCompact) { + tablets.each { + triggerCompaction it, true, 'base' + } + + def startTs = System.currentTimeMillis() + tablets.each { + waitCompaction it, startTs + } + } + + // wait report + Thread.sleep(3 * 1000) + + tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}" + tablets.each { + def backendId = it.BackendId + def visibleVersionCount = it.VisibleVersionCount as long + def totalVersionCount = it.TotalVersionCount as long + def invisibleVersionCount = totalVersionCount - visibleVersionCount + assertEquals(version, it.Version as long) + + if (isInvisibleTimeout) { + // part of invisible version was compact + assertEquals(Math.min(version - visibleVersion, compaction_keep_invisible_version_min_count), + invisibleVersionCount) + } else { + // invisible version couldn't compact + assertEquals(version - visibleVersion, invisibleVersionCount) + } + + def lastVisibleVersionCount = lastVisibleVersionCountMap.get(backendId) + if (isCumuCompactSucc) { + if (runBaseCompact) { + assertEquals(1L, visibleVersionCount) + } else { + assertTrue(lastVisibleVersionCount > visibleVersionCount, + "not met with: lastVisibleVersionCount ${lastVisibleVersionCount} > " + + "visibleVersionCount ${visibleVersionCount}") + } + } else { + assertEquals(lastVisibleVersionCount, visibleVersionCount) + } + } + } + + sql " CREATE TABLE ${tableName} (k1 int, k2 int) DISTRIBUTED BY HASH(k1) BUCKETS 1 " + + // normal + def rowNum = 0 + def insertNRecords = { num -> + // if enable debug point DatabaseTransactionMgr.stop_finish_transaction, + // insert will need to wait insert_visible_timeout_ms. + // so use multiple threads to reduce the wait time. + def futures = [] + for (def i = 0; i < num; i++, rowNum++) { + def index = rowNum + futures.add(thread { + sql " INSERT INTO ${tableName} VALUES (${index}, ${index * 10}) " + }) + } + futures.each { it.get() } + } + insertNRecords(21) + // after insert 21 rows, be can run compact ok. + checkCompact(true, false, false, rowNum + 1, rowNum + 1) + qt_select_1 "SELECT * FROM ${tableName} ORDER BY k1" + + // publish but not visible + def lastRowNum = rowNum + cluster.injectDebugPoints(NodeType.FE, ['DatabaseTransactionMgr.stop_finish_transaction':null]) + insertNRecords(21) + // after enable debugpoint, be will add rowsets, but visible version will not increase. + // then no rowsets can pick to compact. + // so expect compact failed. + checkCompact(false, false, false, rowNum + 1, lastRowNum + 1) + qt_select_2 "SELECT * FROM ${tableName} ORDER BY k1" + + cluster.clearFrontendDebugPoints() + Thread.sleep(5000) + // after clear debug point, visible version will increase. + // then some rowsets can pick to compact. + // so expect compact succ. + checkCompact(true, true, false, rowNum + 1, rowNum + 1) + qt_select_3 "SELECT * FROM ${tableName} ORDER BY k1" + + lastRowNum = rowNum + cluster.injectDebugPoints(NodeType.FE, ['DatabaseTransactionMgr.stop_finish_transaction':null]) + insertNRecords(80) + // 80 versions are not invisible yet, BE will not compact them. + // if we send http to compact them, BE will reply no rowsets can compact now + checkCompact(false, false, false, rowNum + 1, lastRowNum + 1) + // Because BE not compact, so query should be ok. + qt_select_4 "SELECT * FROM ${tableName} ORDER BY k1" + + // timeout, even if a version is not visible, BE will compact them, + // after that query will met E-230 error + update_all_be_config('compaction_keep_invisible_version_timeout_sec', 1) + checkCompact(true, true, true, rowNum + 1, lastRowNum + 1) + + test { + sql "SELECT * FROM ${tableName} ORDER BY k1" + + // E-230: + //(1105, 'errCode = 2, detailMessage = (128.2.51.2)[CANCELLED]missed_versions is empty, spec_version 43, + // max_version 123, tablet_id 10062') + exception 'missed_versions is empty' + } + + def getVersionCountMap = { -> + def versionCountMap = [:] + def tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}" + tablets.each { + versionCountMap.put(it.BackendId as long, [it.VisibleVersionCount as long, it.TotalVersionCount as long]) + } + return versionCountMap + } + + // after backend restart, it should update its visible version from FE + // and then it report its visible version count and total version count + def oldVersionCountMap = getVersionCountMap() + cluster.restartBackends() + Thread.sleep(20000) + def newVersionCountMap = getVersionCountMap() + assertEquals(oldVersionCountMap, newVersionCountMap) + + test { + sql "SELECT * FROM ${tableName} ORDER BY k1" + + // E-230: + //(1105, 'errCode = 2, detailMessage = (128.2.51.2)[CANCELLED]missed_versions is empty, spec_version 43, + // max_version 123, tablet_id 10062') + exception 'missed_versions is empty' + } + + cluster.clearFrontendDebugPoints() + Thread.sleep(5000) + // after clear fe's debug point, the 80 version are visible now. + // so compact is ok + checkCompact(true, false, false, rowNum + 1, rowNum + 1) + qt_select_5 "SELECT * FROM ${tableName} ORDER BY k1" + } +}