Skip to content

Commit

Permalink
refactor to and cloud compaction score
Browse files Browse the repository at this point in the history
  • Loading branch information
TangSiyang2001 committed Aug 27, 2024
1 parent 7b009fa commit 646d360
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 42 deletions.
194 changes: 158 additions & 36 deletions be/src/http/action/compaction_score_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,27 @@
#include <rapidjson/prettywriter.h>
#include <rapidjson/stringbuffer.h>

#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <exception>
#include <functional>
#include <iterator>
#include <limits>
#include <memory>
#include <ranges>
#include <span>
#include <string>
#include <string_view>
#include <vector>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
#include "common/status.h"
#include "http/http_channel.h"
#include "http/http_handler_with_auth.h"
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
Expand All @@ -38,73 +51,182 @@

namespace doris {

const std::string TOP_N = "topn_n";
const std::string SYNC_META = "sync_meta";
const std::string COMPACTION_SCORE = "compaction_score";
constexpr size_t DEFAULT_TOP_N = std::numeric_limits<size_t>::max();
constexpr bool DEFAULT_SYNC_META = false;
constexpr std::string_view TABLET_ID = "tablet_id";
constexpr std::string_view COMPACTION_SCORE = "compaction_score";

CompactionScoreAction::CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type,
StorageEngine& storage_engine)
: HttpHandlerWithAuth(exec_env, hier, type), _storage_engine(storage_engine) {}
struct CompactionScoreResult {
int64_t tablet_id;
size_t compaction_score;
};

bool operator>(const CompactionScoreResult& lhs, const CompactionScoreResult& rhs) {
return lhs.compaction_score > rhs.compaction_score;
}

template <typename T>
concept CompactionScoreAccessble = requires(T t) {
{ t.get_real_compaction_score() } -> std::same_as<uint32_t>;
};

template <CompactionScoreAccessble T>
std::vector<CompactionScoreResult> calculate_compaction_scores(
std::span<std::shared_ptr<T>> tablets) {
std::vector<CompactionScoreResult> result;
result.reserve(tablets.size());
std::ranges::transform(tablets, std::back_inserter(result),
[](const std::shared_ptr<T>& tablet) -> CompactionScoreResult {
return {.tablet_id = tablet->tablet_id(),
.compaction_score = tablet->get_real_compaction_score()};
});
return result;
}

struct CompactionScoresAccessor {
virtual ~CompactionScoresAccessor() = default;

virtual std::vector<CompactionScoreResult> get_all_tablet_compaction_scores() = 0;
};

struct LocalCompactionScoreAccessor final : CompactionScoresAccessor {
LocalCompactionScoreAccessor(TabletManager* tablet_mgr) : tablet_mgr(tablet_mgr) {}

std::vector<CompactionScoreResult> get_all_tablet_compaction_scores() override {
DCHECK_NOTNULL(tablet_mgr);
auto tablets = tablet_mgr->get_all_tablet();
std::span<TabletSharedPtr> s = {tablets.begin(), tablets.end()};
return calculate_compaction_scores(s);
}

TabletManager* tablet_mgr;
};

struct CloudCompactionScoresAccessor final : CompactionScoresAccessor {
CloudCompactionScoresAccessor(CloudTabletMgr& tablet_mgr) : tablet_mgr(tablet_mgr) {}

std::vector<CompactionScoreResult> get_all_tablet_compaction_scores() override {
auto tablets = get_all_tablets();
std::span<CloudTabletSPtr> s = {tablets.begin(), tablets.end()};
return calculate_compaction_scores(s);
}

Status sync_meta() {
auto tablets = get_all_tablets();
for (const auto& tablet : tablets) {
RETURN_IF_ERROR(tablet->sync_meta());
RETURN_IF_ERROR(tablet->sync_rowsets());
}
return Status::OK();
}

std::vector<CloudTabletSPtr> get_all_tablets() {
auto weak_tablets = tablet_mgr.get_weak_tablets();
std::vector<CloudTabletSPtr> tablets;
tablets.reserve(weak_tablets.size());
for (auto& weak_tablet : weak_tablets) {
if (auto tablet = weak_tablet.lock()) {
tablets.push_back(std::move(tablet));
}
}
return tablets;
}

CloudTabletMgr& tablet_mgr;
};

static rapidjson::Value jsonfy_tablet_compaction_score(
const TabletSharedPtr& tablet, rapidjson::MemoryPoolAllocator<>& allocator) {
const CompactionScoreResult& result, rapidjson::MemoryPoolAllocator<>& allocator) {
rapidjson::Value node;
node.SetObject();

rapidjson::Value tablet_id_key;
tablet_id_key.SetString(TABLET_ID.data(), TABLET_ID.length(), allocator);
rapidjson::Value tablet_id_val;
auto tablet_id_str = std::to_string(tablet->tablet_id());
auto tablet_id_str = std::to_string(result.tablet_id);
tablet_id_val.SetString(tablet_id_str.c_str(), tablet_id_str.length(), allocator);

rapidjson::Value score_key;
score_key.SetString(COMPACTION_SCORE.data(), COMPACTION_SCORE.size());
rapidjson::Value score_val;
auto score = tablet->get_real_compaction_score();
auto score_str = std::to_string(score);
auto score_str = std::to_string(result.compaction_score);
score_val.SetString(score_str.c_str(), score_str.length(), allocator);
node.AddMember(score_key, score_val, allocator);

node.AddMember(tablet_id_key, tablet_id_val, allocator);
return node;
}

CompactionScoreAction::CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type, TabletManager* tablet_mgr)
: HttpHandlerWithAuth(exec_env, hier, type),
_accessor(std::make_unique<LocalCompactionScoreAccessor>(tablet_mgr)) {}

CompactionScoreAction::CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type, CloudTabletMgr& tablet_mgr)
: HttpHandlerWithAuth(exec_env, hier, type),
_accessor(std::make_unique<CloudCompactionScoresAccessor>(tablet_mgr)) {}

void CompactionScoreAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HttpHeaders::JsonType.data());
auto top_n_param = req->param(TOP_N);

size_t top_n = DEFAULT_TOP_N;
if (!top_n_param.empty()) {
try {
top_n = std::stoll(top_n_param);
} catch (const std::exception& e) {
LOG(WARNING) << "convert failed:" << e.what();
auto msg = std::format("invalid argument: top_n={}", top_n_param);
HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, msg);
return;
}
}

auto sync_meta_param = req->param(SYNC_META);
bool sync_meta = DEFAULT_SYNC_META;
if (!sync_meta_param.empty() and !config::is_cloud_mode()) {
HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST,
"sync meta is only available for cloud mode");
return;
}
if (sync_meta_param == "true") {
sync_meta = true;
} else if (sync_meta_param == "false") {
sync_meta = false;
} else {
auto msg = std::format("invalid argument: sync_meta={}", sync_meta_param);
HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, msg);
return;
}

std::string result;
if (auto st = _handle(req, &result); !st) {
if (auto st = _handle(top_n, sync_meta, &result); !st) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, st.to_json());
return;
}
HttpChannel::send_reply(req, HttpStatus::OK, result);
}

Status CompactionScoreAction::_handle(HttpRequest* req, std::string* result) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HttpHeaders::JsonType.data());
auto tablet_id_param = req->param(TABLET_ID.data());
Status CompactionScoreAction::_handle(size_t top_n, bool sync_meta, std::string* result) {
if (sync_meta) {
DCHECK(config::is_cloud_mode());
RETURN_IF_ERROR(static_cast<CloudCompactionScoresAccessor*>(_accessor.get())->sync_meta());
}

auto scores = _accessor->get_all_tablet_compaction_scores();
top_n = std::min(top_n, scores.size());
std::partial_sort(scores.begin(), scores.begin() + top_n, scores.end(), std::greater<>());

rapidjson::Document root;
if (tablet_id_param.empty()) {
// fetch comapction scores from all tablets
// [{tablet_id: xxx, base_compaction_score: xxx, cumu_compaction_score: xxx}, ...]
auto tablets = _storage_engine.tablet_manager()->get_all_tablet();
root.SetArray();
auto& allocator = root.GetAllocator();
for (const auto& tablet : tablets) {
root.PushBack(jsonfy_tablet_compaction_score(tablet, allocator), allocator);
}
} else {
// {tablet_id: xxx, base_compaction_score: xxx, cumu_compaction_score: xxx}
int64_t tablet_id;
try {
tablet_id = std::stoll(tablet_id_param);
} catch (const std::exception& e) {
LOG(WARNING) << "convert failed:" << e.what();
return Status::InvalidArgument("invalid argument: tablet_id={}", tablet_id_param);
}
auto base_tablet = DORIS_TRY(_storage_engine.get_tablet(tablet_id));
auto tablet = std::static_pointer_cast<Tablet>(base_tablet);
root.SetObject();
auto val = jsonfy_tablet_compaction_score(tablet, root.GetAllocator());
root.Swap(val);
root.SetArray();
auto& allocator = root.GetAllocator();
for (const auto& e : scores | std::views::take(top_n)) {
root.PushBack(jsonfy_tablet_compaction_score(e, allocator), allocator);
}

rapidjson::StringBuffer str_buf;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(str_buf);
root.Accept(writer);
Expand Down
19 changes: 15 additions & 4 deletions be/src/http/action/compaction_score_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,37 @@

#pragma once

#include <gen_cpp/FrontendService_types.h>

#include <cstddef>
#include <memory>
#include <string>

#include "cloud/cloud_tablet_mgr.h"
#include "common/status.h"
#include "http/http_handler_with_auth.h"
#include "http/http_request.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
namespace doris {

struct CompactionScoresAccessor;

// topn, sync
class CompactionScoreAction : public HttpHandlerWithAuth {
public:
CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier, TPrivilegeType::type type,
StorageEngine& storage_engine);
explicit CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type, TabletManager* tablet_mgr);

explicit CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type, CloudTabletMgr& tablet_mgr);

void handle(HttpRequest* req) override;

private:
Status _handle(HttpRequest* req, std::string* result);
Status _handle(size_t top_n, bool sync_meta, std::string* result);

StorageEngine& _storage_engine;
std::unique_ptr<CompactionScoresAccessor> _accessor;
};

} // namespace doris
8 changes: 6 additions & 2 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ void HttpService::register_local_handler(StorageEngine& engine) {
_ev_http_server->register_handler(HttpMethod::GET, "/api/show_nested_index_file",
show_nested_index_file_action);

CompactionScoreAction* compaction_score_action = _pool.add(
new CompactionScoreAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, engine));
CompactionScoreAction* compaction_score_action = _pool.add(new CompactionScoreAction(
_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, engine.tablet_manager()));
_ev_http_server->register_handler(HttpMethod::GET, "/api/compaction_score",
compaction_score_action);
}
Expand Down Expand Up @@ -424,6 +424,10 @@ void HttpService::register_cloud_handler(CloudStorageEngine& engine) {
new ShowNestedIndexFileAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
_ev_http_server->register_handler(HttpMethod::GET, "/api/show_nested_index_file",
show_nested_index_file_action);
CompactionScoreAction* compaction_score_action = _pool.add(new CompactionScoreAction(
_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, engine.tablet_mgr()));
_ev_http_server->register_handler(HttpMethod::GET, "/api/compaction_score",
compaction_score_action);
}
// NOLINTEND(readability-function-size)

Expand Down

0 comments on commit 646d360

Please sign in to comment.