Skip to content

Commit

Permalink
[feature](compaction) Add an http action for visibility of compaction…
Browse files Browse the repository at this point in the history
… score on each tablet (#38489)

## Proposed changes

As title.

Usage:
1. `curl http://be_ip:be_host/api/compaction_score?top_n=10`
Returns a json object contains compaction score for top n, n=top_n.
```
[
    {
        "compaction_score": "5",
        "tablet_id": "42595"
    },
    {
        "compaction_score": "5",
        "tablet_id": "42587"
    },
    {
        "compaction_score": "5",
        "tablet_id": "42593"
    },
    {
        "compaction_score": "5",
        "tablet_id": "42597"
    },
    {
        "compaction_score": "5",
        "tablet_id": "42589"
    },
    {
        "compaction_score": "5",
        "tablet_id": "42599"
    },
    {
        "compaction_score": "5",
        "tablet_id": "42601"
    },
    {
        "compaction_score": "5",
        "tablet_id": "42591"
    },
    {
        "compaction_score": "5",
        "tablet_id": "42585"
    },
    {
        "compaction_score": "4",
        "tablet_id": "10034"
    }
]
```
If top_n is not specified, return all compaction score for all tablets.
If top_n is illegal, raise an error.
```
invalid argument: top_n=wrong
```

2. `curl http://be_ip:be_host/api/compaction_score?sync_meta=true`
`sync_meta` is only available on cloud mode, will sync meta from meta
service. It can cooperate with top_n.
If add param `sync_meta` on non-cloud mode, will raise an error.
```
sync meta is only available for cloud mode
```

3. In the future, this endpoint may extend other utility, like fetching
tablet compaction score by table id, etc.
  • Loading branch information
TangSiyang2001 authored and dataroaring committed Oct 7, 2024
1 parent 4f39cfd commit fcb8619
Show file tree
Hide file tree
Showing 7 changed files with 382 additions and 0 deletions.
236 changes: 236 additions & 0 deletions be/src/http/action/compaction_score_action.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "http/action/compaction_score_action.h"

#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <rapidjson/document.h>
#include <rapidjson/prettywriter.h>
#include <rapidjson/stringbuffer.h>

#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <exception>
#include <functional>
#include <iterator>
#include <limits>
#include <memory>
#include <span>
#include <stdexcept>
#include <string>
#include <string_view>
#include <vector>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
#include "common/status.h"
#include "http/http_channel.h"
#include "http/http_handler_with_auth.h"
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_manager.h"
#include "util/stopwatch.hpp"

namespace doris {

const std::string TOP_N = "top_n";
const std::string SYNC_META = "sync_meta";
const std::string COMPACTION_SCORE = "compaction_score";
constexpr size_t DEFAULT_TOP_N = std::numeric_limits<size_t>::max();
constexpr bool DEFAULT_SYNC_META = false;
constexpr std::string_view TABLET_ID = "tablet_id";

template <typename T>
concept CompactionScoreAccessble = requires(T t) {
{ t.get_real_compaction_score() } -> std::same_as<uint32_t>;
};

template <CompactionScoreAccessble T>
std::vector<CompactionScoreResult> calculate_compaction_scores(
std::span<std::shared_ptr<T>> tablets) {
std::vector<CompactionScoreResult> result;
result.reserve(tablets.size());
std::ranges::transform(tablets, std::back_inserter(result),
[](const std::shared_ptr<T>& tablet) -> CompactionScoreResult {
return {.tablet_id = tablet->tablet_id(),
.compaction_score = tablet->get_real_compaction_score()};
});
return result;
}

struct LocalCompactionScoreAccessor final : CompactionScoresAccessor {
LocalCompactionScoreAccessor(TabletManager* tablet_mgr) : tablet_mgr(tablet_mgr) {}

std::vector<CompactionScoreResult> get_all_tablet_compaction_scores() override {
auto tablets = tablet_mgr->get_all_tablet();
std::span<TabletSharedPtr> s = {tablets.begin(), tablets.end()};
return calculate_compaction_scores(s);
}

TabletManager* tablet_mgr;
};

struct CloudCompactionScoresAccessor final : CompactionScoresAccessor {
CloudCompactionScoresAccessor(CloudTabletMgr& tablet_mgr) : tablet_mgr(tablet_mgr) {}

std::vector<CompactionScoreResult> get_all_tablet_compaction_scores() override {
auto tablets = get_all_tablets();
std::span<CloudTabletSPtr> s = {tablets.begin(), tablets.end()};
return calculate_compaction_scores(s);
}

Status sync_meta() {
auto tablets = get_all_tablets();
LOG(INFO) << "start to sync meta from ms";

MonotonicStopWatch stopwatch;
stopwatch.start();

for (const auto& tablet : tablets) {
RETURN_IF_ERROR(tablet->sync_meta());
RETURN_IF_ERROR(tablet->sync_rowsets());
}

stopwatch.stop();
LOG(INFO) << "sync meta finish, time=" << stopwatch.elapsed_time() << "ns";

return Status::OK();
}

std::vector<CloudTabletSPtr> get_all_tablets() {
auto weak_tablets = tablet_mgr.get_weak_tablets();
std::vector<CloudTabletSPtr> tablets;
tablets.reserve(weak_tablets.size());
for (auto& weak_tablet : weak_tablets) {
if (auto tablet = weak_tablet.lock();
tablet != nullptr and tablet->tablet_state() == TABLET_RUNNING) {
tablets.push_back(std::move(tablet));
}
}
return tablets;
}

CloudTabletMgr& tablet_mgr;
};

static rapidjson::Value jsonfy_tablet_compaction_score(
const CompactionScoreResult& result, rapidjson::MemoryPoolAllocator<>& allocator) {
rapidjson::Value node;
node.SetObject();

rapidjson::Value tablet_id_key;
tablet_id_key.SetString(TABLET_ID.data(), TABLET_ID.length(), allocator);
rapidjson::Value tablet_id_val;
auto tablet_id_str = std::to_string(result.tablet_id);
tablet_id_val.SetString(tablet_id_str.c_str(), tablet_id_str.length(), allocator);

rapidjson::Value score_key;
score_key.SetString(COMPACTION_SCORE.data(), COMPACTION_SCORE.size());
rapidjson::Value score_val;
auto score_str = std::to_string(result.compaction_score);
score_val.SetString(score_str.c_str(), score_str.length(), allocator);
node.AddMember(score_key, score_val, allocator);

node.AddMember(tablet_id_key, tablet_id_val, allocator);
return node;
}

CompactionScoreAction::CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type, TabletManager* tablet_mgr)
: HttpHandlerWithAuth(exec_env, hier, type),
_accessor(std::make_unique<LocalCompactionScoreAccessor>(tablet_mgr)) {}

CompactionScoreAction::CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type, CloudTabletMgr& tablet_mgr)
: HttpHandlerWithAuth(exec_env, hier, type),
_accessor(std::make_unique<CloudCompactionScoresAccessor>(tablet_mgr)) {}

void CompactionScoreAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HttpHeaders::JsonType.data());
auto top_n_param = req->param(TOP_N);

size_t top_n = DEFAULT_TOP_N;
if (!top_n_param.empty()) {
try {
auto tmp_top_n = std::stoll(top_n_param);
if (tmp_top_n < 0) {
throw std::invalid_argument("`top_n` cannot less than 0");
}
top_n = tmp_top_n;
} catch (const std::exception& e) {
LOG(WARNING) << "convert failed:" << e.what();
auto msg = fmt::format("invalid argument: top_n={}", top_n_param);
HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, msg);
return;
}
}

auto sync_meta_param = req->param(SYNC_META);
bool sync_meta = DEFAULT_SYNC_META;
if (!sync_meta_param.empty() and !config::is_cloud_mode()) {
HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST,
"param `sync_meta` is only available for cloud mode");
return;
}
if (sync_meta_param == "true") {
sync_meta = true;
} else if (sync_meta_param == "false") {
sync_meta = false;
} else if (!sync_meta_param.empty()) {
auto msg = fmt::format("invalid argument: sync_meta={}", sync_meta_param);
HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, msg);
return;
}

std::string result;
if (auto st = _handle(top_n, sync_meta, &result); !st) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, st.to_json());
return;
}
HttpChannel::send_reply(req, HttpStatus::OK, result);
}

Status CompactionScoreAction::_handle(size_t top_n, bool sync_meta, std::string* result) {
if (sync_meta) {
DCHECK(config::is_cloud_mode());
RETURN_IF_ERROR(static_cast<CloudCompactionScoresAccessor*>(_accessor.get())->sync_meta());
}

auto scores = _accessor->get_all_tablet_compaction_scores();
top_n = std::min(top_n, scores.size());
std::partial_sort(scores.begin(), scores.begin() + top_n, scores.end(), std::greater<>());

rapidjson::Document root;
root.SetArray();
auto& allocator = root.GetAllocator();
std::for_each(scores.begin(), scores.begin() + top_n, [&](const auto& score) {
root.PushBack(jsonfy_tablet_compaction_score(score, allocator), allocator);
});
rapidjson::StringBuffer str_buf;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(str_buf);
root.Accept(writer);
*result = str_buf.GetString();
return Status::OK();
}

} // namespace doris
66 changes: 66 additions & 0 deletions be/src/http/action/compaction_score_action.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <gen_cpp/FrontendService_types.h>

#include <cstddef>
#include <memory>
#include <string>

#include "cloud/cloud_tablet_mgr.h"
#include "common/status.h"
#include "http/http_handler_with_auth.h"
#include "http/http_request.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
namespace doris {

struct CompactionScoreResult {
int64_t tablet_id;
size_t compaction_score;
};

inline bool operator>(const CompactionScoreResult& lhs, const CompactionScoreResult& rhs) {
return lhs.compaction_score > rhs.compaction_score;
}

struct CompactionScoresAccessor {
virtual ~CompactionScoresAccessor() = default;

virtual std::vector<CompactionScoreResult> get_all_tablet_compaction_scores() = 0;
};

// topn, sync
class CompactionScoreAction : public HttpHandlerWithAuth {
public:
explicit CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type, TabletManager* tablet_mgr);

explicit CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type, CloudTabletMgr& tablet_mgr);

void handle(HttpRequest* req) override;

private:
Status _handle(size_t top_n, bool sync_meta, std::string* result);

std::unique_ptr<CompactionScoresAccessor> _accessor;
};

} // namespace doris
9 changes: 9 additions & 0 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "olap/rowid_conversion.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/rowset/rowset_reader.h"
#include "olap/tablet_fwd.h"
#include "olap/txn_manager.h"
Expand Down Expand Up @@ -182,6 +183,14 @@ Status BaseTablet::update_by_least_common_schema(const TabletSchemaSPtr& update_
return Status::OK();
}

uint32_t BaseTablet::get_real_compaction_score() const {
const auto& rs_metas = _tablet_meta->all_rs_metas();
return std::accumulate(rs_metas.begin(), rs_metas.end(), 0,
[](uint32_t score, const RowsetMetaSharedPtr& rs_meta) {
return score + rs_meta->get_compaction_score();
});
}

Status BaseTablet::capture_rs_readers_unlocked(const Versions& version_path,
std::vector<RowSetSplits>* rs_splits) const {
DCHECK(rs_splits != nullptr && rs_splits->empty());
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ class BaseTablet {

virtual size_t tablet_footprint() = 0;

// this method just return the compaction sum on each rowset
// note(tsy): we should unify the compaction score calculation finally
uint32_t get_real_compaction_score() const;

// MUST hold shared meta lock
Status capture_rs_readers_unlocked(const Versions& version_path,
std::vector<RowSetSplits>* rs_splits) const;
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,9 @@ uint32_t Tablet::calc_cold_data_compaction_score() const {

uint32_t Tablet::_calc_cumulative_compaction_score(
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy) {
if (cumulative_compaction_policy == nullptr) [[unlikely]] {
return 0;
}
#ifndef BE_TEST
if (_cumulative_compaction_policy == nullptr ||
_cumulative_compaction_policy->name() != cumulative_compaction_policy->name()) {
Expand Down
11 changes: 11 additions & 0 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <event2/bufferevent.h>
#include <event2/http.h>
#include <gen_cpp/FrontendService_types.h>

#include <string>
#include <vector>
Expand All @@ -38,6 +39,7 @@
#include "http/action/checksum_action.h"
#include "http/action/clear_cache_action.h"
#include "http/action/compaction_action.h"
#include "http/action/compaction_score_action.h"
#include "http/action/config_action.h"
#include "http/action/debug_point_action.h"
#include "http/action/download_action.h"
Expand Down Expand Up @@ -382,6 +384,11 @@ void HttpService::register_local_handler(StorageEngine& engine) {
new ShowNestedIndexFileAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
_ev_http_server->register_handler(HttpMethod::GET, "/api/show_nested_index_file",
show_nested_index_file_action);

CompactionScoreAction* compaction_score_action = _pool.add(new CompactionScoreAction(
_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, engine.tablet_manager()));
_ev_http_server->register_handler(HttpMethod::GET, "/api/compaction_score",
compaction_score_action);
}

void HttpService::register_cloud_handler(CloudStorageEngine& engine) {
Expand Down Expand Up @@ -423,6 +430,10 @@ void HttpService::register_cloud_handler(CloudStorageEngine& engine) {
new ShowNestedIndexFileAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
_ev_http_server->register_handler(HttpMethod::GET, "/api/show_nested_index_file",
show_nested_index_file_action);
CompactionScoreAction* compaction_score_action = _pool.add(new CompactionScoreAction(
_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, engine.tablet_mgr()));
_ev_http_server->register_handler(HttpMethod::GET, "/api/compaction_score",
compaction_score_action);
}
// NOLINTEND(readability-function-size)

Expand Down
Loading

0 comments on commit fcb8619

Please sign in to comment.