diff --git a/be/src/http/action/compaction_score_action.cpp b/be/src/http/action/compaction_score_action.cpp index ab660656bcab650..04b742f4b0d77ad 100644 --- a/be/src/http/action/compaction_score_action.cpp +++ b/be/src/http/action/compaction_score_action.cpp @@ -22,14 +22,27 @@ #include #include +#include +#include #include #include +#include +#include +#include #include +#include +#include #include #include +#include +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet.h" +#include "cloud/cloud_tablet_mgr.h" +#include "cloud/config.h" #include "common/status.h" #include "http/http_channel.h" +#include "http/http_handler_with_auth.h" #include "http/http_headers.h" #include "http/http_request.h" #include "http/http_status.h" @@ -38,30 +51,107 @@ namespace doris { +const std::string TOP_N = "topn_n"; +const std::string SYNC_META = "sync_meta"; +const std::string COMPACTION_SCORE = "compaction_score"; +constexpr size_t DEFAULT_TOP_N = std::numeric_limits::max(); +constexpr bool DEFAULT_SYNC_META = false; constexpr std::string_view TABLET_ID = "tablet_id"; -constexpr std::string_view COMPACTION_SCORE = "compaction_score"; -CompactionScoreAction::CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier, - TPrivilegeType::type type, - StorageEngine& storage_engine) - : HttpHandlerWithAuth(exec_env, hier, type), _storage_engine(storage_engine) {} +struct CompactionScoreResult { + int64_t tablet_id; + size_t compaction_score; +}; + +bool operator>(const CompactionScoreResult& lhs, const CompactionScoreResult& rhs) { + return lhs.compaction_score > rhs.compaction_score; +} + +template +concept CompactionScoreAccessble = requires(T t) { + { t.get_real_compaction_score() } -> std::same_as; +}; + +template +std::vector calculate_compaction_scores( + std::span> tablets) { + std::vector result; + result.reserve(tablets.size()); + std::ranges::transform(tablets, std::back_inserter(result), + [](const std::shared_ptr& tablet) -> CompactionScoreResult { + return {.tablet_id = tablet->tablet_id(), + .compaction_score = tablet->get_real_compaction_score()}; + }); + return result; +} + +struct CompactionScoresAccessor { + virtual ~CompactionScoresAccessor() = default; + + virtual std::vector get_all_tablet_compaction_scores() = 0; +}; + +struct LocalCompactionScoreAccessor final : CompactionScoresAccessor { + LocalCompactionScoreAccessor(TabletManager* tablet_mgr) : tablet_mgr(tablet_mgr) {} + + std::vector get_all_tablet_compaction_scores() override { + DCHECK_NOTNULL(tablet_mgr); + auto tablets = tablet_mgr->get_all_tablet(); + std::span s = {tablets.begin(), tablets.end()}; + return calculate_compaction_scores(s); + } + + TabletManager* tablet_mgr; +}; + +struct CloudCompactionScoresAccessor final : CompactionScoresAccessor { + CloudCompactionScoresAccessor(CloudTabletMgr& tablet_mgr) : tablet_mgr(tablet_mgr) {} + + std::vector get_all_tablet_compaction_scores() override { + auto tablets = get_all_tablets(); + std::span s = {tablets.begin(), tablets.end()}; + return calculate_compaction_scores(s); + } + + Status sync_meta() { + auto tablets = get_all_tablets(); + for (const auto& tablet : tablets) { + RETURN_IF_ERROR(tablet->sync_meta()); + RETURN_IF_ERROR(tablet->sync_rowsets()); + } + return Status::OK(); + } + + std::vector get_all_tablets() { + auto weak_tablets = tablet_mgr.get_weak_tablets(); + std::vector tablets; + tablets.reserve(weak_tablets.size()); + for (auto& weak_tablet : weak_tablets) { + if (auto tablet = weak_tablet.lock()) { + tablets.push_back(std::move(tablet)); + } + } + return tablets; + } + + CloudTabletMgr& tablet_mgr; +}; static rapidjson::Value jsonfy_tablet_compaction_score( - const TabletSharedPtr& tablet, rapidjson::MemoryPoolAllocator<>& allocator) { + const CompactionScoreResult& result, rapidjson::MemoryPoolAllocator<>& allocator) { rapidjson::Value node; node.SetObject(); rapidjson::Value tablet_id_key; tablet_id_key.SetString(TABLET_ID.data(), TABLET_ID.length(), allocator); rapidjson::Value tablet_id_val; - auto tablet_id_str = std::to_string(tablet->tablet_id()); + auto tablet_id_str = std::to_string(result.tablet_id); tablet_id_val.SetString(tablet_id_str.c_str(), tablet_id_str.length(), allocator); rapidjson::Value score_key; score_key.SetString(COMPACTION_SCORE.data(), COMPACTION_SCORE.size()); rapidjson::Value score_val; - auto score = tablet->get_real_compaction_score(); - auto score_str = std::to_string(score); + auto score_str = std::to_string(result.compaction_score); score_val.SetString(score_str.c_str(), score_str.length(), allocator); node.AddMember(score_key, score_val, allocator); @@ -69,42 +159,74 @@ static rapidjson::Value jsonfy_tablet_compaction_score( return node; } +CompactionScoreAction::CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier, + TPrivilegeType::type type, TabletManager* tablet_mgr) + : HttpHandlerWithAuth(exec_env, hier, type), + _accessor(std::make_unique(tablet_mgr)) {} + +CompactionScoreAction::CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier, + TPrivilegeType::type type, CloudTabletMgr& tablet_mgr) + : HttpHandlerWithAuth(exec_env, hier, type), + _accessor(std::make_unique(tablet_mgr)) {} + void CompactionScoreAction::handle(HttpRequest* req) { + req->add_output_header(HttpHeaders::CONTENT_TYPE, HttpHeaders::JsonType.data()); + auto top_n_param = req->param(TOP_N); + + size_t top_n = DEFAULT_TOP_N; + if (!top_n_param.empty()) { + try { + top_n = std::stoll(top_n_param); + } catch (const std::exception& e) { + LOG(WARNING) << "convert failed:" << e.what(); + auto msg = std::format("invalid argument: top_n={}", top_n_param); + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, msg); + return; + } + } + + auto sync_meta_param = req->param(SYNC_META); + bool sync_meta = DEFAULT_SYNC_META; + if (!sync_meta_param.empty() and !config::is_cloud_mode()) { + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, + "sync meta is only available for cloud mode"); + return; + } + if (sync_meta_param == "true") { + sync_meta = true; + } else if (sync_meta_param == "false") { + sync_meta = false; + } else { + auto msg = std::format("invalid argument: sync_meta={}", sync_meta_param); + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, msg); + return; + } + std::string result; - if (auto st = _handle(req, &result); !st) { + if (auto st = _handle(top_n, sync_meta, &result); !st) { HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, st.to_json()); + return; } HttpChannel::send_reply(req, HttpStatus::OK, result); } -Status CompactionScoreAction::_handle(HttpRequest* req, std::string* result) { - req->add_output_header(HttpHeaders::CONTENT_TYPE, HttpHeaders::JsonType.data()); - auto tablet_id_param = req->param(TABLET_ID.data()); +Status CompactionScoreAction::_handle(size_t top_n, bool sync_meta, std::string* result) { + if (sync_meta) { + DCHECK(config::is_cloud_mode()); + RETURN_IF_ERROR(static_cast(_accessor.get())->sync_meta()); + } + + auto scores = _accessor->get_all_tablet_compaction_scores(); + top_n = std::min(top_n, scores.size()); + std::partial_sort(scores.begin(), scores.begin() + top_n, scores.end(), std::greater<>()); + rapidjson::Document root; - if (tablet_id_param.empty()) { - // fetch comapction scores from all tablets - // [{tablet_id: xxx, base_compaction_score: xxx, cumu_compaction_score: xxx}, ...] - auto tablets = _storage_engine.tablet_manager()->get_all_tablet(); - root.SetArray(); - auto& allocator = root.GetAllocator(); - for (const auto& tablet : tablets) { - root.PushBack(jsonfy_tablet_compaction_score(tablet, allocator), allocator); - } - } else { - // {tablet_id: xxx, base_compaction_score: xxx, cumu_compaction_score: xxx} - int64_t tablet_id; - try { - tablet_id = std::stoll(tablet_id_param); - } catch (const std::exception& e) { - LOG(WARNING) << "convert failed:" << e.what(); - return Status::InvalidArgument("invalid argument: tablet_id={}", tablet_id_param); - } - auto base_tablet = DORIS_TRY(_storage_engine.get_tablet(tablet_id)); - auto tablet = std::static_pointer_cast(base_tablet); - root.SetObject(); - auto val = jsonfy_tablet_compaction_score(tablet, root.GetAllocator()); - root.Swap(val); + root.SetArray(); + auto& allocator = root.GetAllocator(); + for (const auto& e : scores | std::views::take(top_n)) { + root.PushBack(jsonfy_tablet_compaction_score(e, allocator), allocator); } + rapidjson::StringBuffer str_buf; rapidjson::PrettyWriter writer(str_buf); root.Accept(writer); diff --git a/be/src/http/action/compaction_score_action.h b/be/src/http/action/compaction_score_action.h index 1738c1d5ab9cb00..ded30d2656155bb 100644 --- a/be/src/http/action/compaction_score_action.h +++ b/be/src/http/action/compaction_score_action.h @@ -17,26 +17,37 @@ #pragma once +#include + +#include +#include #include +#include "cloud/cloud_tablet_mgr.h" #include "common/status.h" #include "http/http_handler_with_auth.h" #include "http/http_request.h" #include "olap/storage_engine.h" +#include "runtime/exec_env.h" namespace doris { +struct CompactionScoresAccessor; + // topn, sync class CompactionScoreAction : public HttpHandlerWithAuth { public: - CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier, TPrivilegeType::type type, - StorageEngine& storage_engine); + explicit CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier, + TPrivilegeType::type type, TabletManager* tablet_mgr); + + explicit CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier, + TPrivilegeType::type type, CloudTabletMgr& tablet_mgr); void handle(HttpRequest* req) override; private: - Status _handle(HttpRequest* req, std::string* result); + Status _handle(size_t top_n, bool sync_meta, std::string* result); - StorageEngine& _storage_engine; + std::unique_ptr _accessor; }; } // namespace doris diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index e708d32ea9d2fa0..f2c325bebc78069 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -384,8 +384,8 @@ void HttpService::register_local_handler(StorageEngine& engine) { _ev_http_server->register_handler(HttpMethod::GET, "/api/show_nested_index_file", show_nested_index_file_action); - CompactionScoreAction* compaction_score_action = _pool.add( - new CompactionScoreAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, engine)); + CompactionScoreAction* compaction_score_action = _pool.add(new CompactionScoreAction( + _env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, engine.tablet_manager())); _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction_score", compaction_score_action); } @@ -424,6 +424,10 @@ void HttpService::register_cloud_handler(CloudStorageEngine& engine) { new ShowNestedIndexFileAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); _ev_http_server->register_handler(HttpMethod::GET, "/api/show_nested_index_file", show_nested_index_file_action); + CompactionScoreAction* compaction_score_action = _pool.add(new CompactionScoreAction( + _env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, engine.tablet_mgr())); + _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction_score", + compaction_score_action); } // NOLINTEND(readability-function-size)