Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](clone) Fix clone and alter tablet use same tablet path #34889 #36791

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,36 @@ void DataDir::add_pending_ids(const std::string& id) {
_pending_path_ids.insert(id);
}

// gc unused local tablet dir
void DataDir::_perform_tablet_gc(const std::string& tablet_schema_hash_path, int16_t shard_id) {
if (_stop_bg_worker) {
return;
}

TTabletId tablet_id = -1;
TSchemaHash schema_hash = -1;
bool is_valid = TabletManager::get_tablet_id_and_schema_hash_from_path(
tablet_schema_hash_path, &tablet_id, &schema_hash);
if (!is_valid || tablet_id < 1 || schema_hash < 1) [[unlikely]] {
LOG(WARNING) << "[path gc] unknown path: " << tablet_schema_hash_path;
return;
}

auto tablet = _engine.tablet_manager()->get_tablet(tablet_id);
if (!tablet || tablet->data_dir() != this) {
if (tablet) {
LOG(INFO) << "The tablet in path " << tablet_schema_hash_path
<< " is not same with the running one: " << tablet->tablet_path()
<< ", might be the old tablet after migration, try to move it to trash";
}
_engine.tablet_manager()->try_delete_unused_tablet_path(this, tablet_id, schema_hash,
tablet_schema_hash_path, shard_id);
return;
}

_perform_rowset_gc(tablet_schema_hash_path);
}

void DataDir::remove_pending_ids(const std::string& id) {
std::lock_guard<std::shared_mutex> wr_lock(_pending_path_mutex);
_pending_path_ids.erase(id);
Expand Down Expand Up @@ -815,6 +845,14 @@ Status DataDir::perform_path_scan() {
fmt::format("{}/{}", tablet_schema_hash_path, rowset_file.file_name);
_all_check_paths.insert(rowset_file_path);
}
int16_t shard_id = -1;
try {
shard_id = std::stoi(shard.file_name);
} catch (const std::exception&) {
LOG(WARNING) << "failed to stoi shard_id, shard name=" << shard.file_name;
continue;
}
_perform_tablet_gc(tablet_id_path + '/' + schema_hash.file_name, shard_id);
}
}
}
Expand Down Expand Up @@ -947,16 +985,23 @@ Status DataDir::move_to_trash(const std::string& tablet_path) {
}

// 5. check parent dir of source file, delete it when empty
RETURN_IF_ERROR(delete_tablet_parent_path_if_empty(tablet_path));

return Status::OK();
}

Status DataDir::delete_tablet_parent_path_if_empty(const std::string& tablet_path) {
auto fs_tablet_path = io::Path(tablet_path);
std::string source_parent_dir = fs_tablet_path.parent_path(); // tablet_id level
std::vector<io::FileInfo> sub_files;
bool exists = true;
RETURN_IF_ERROR(
io::global_local_filesystem()->list(source_parent_dir, false, &sub_files, &exists));
if (sub_files.empty()) {
LOG(INFO) << "remove empty dir " << source_parent_dir;
// no need to exam return status
io::global_local_filesystem()->delete_directory(source_parent_dir);
}

return Status::OK();
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/data_dir.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ class DataDir {
// Move tablet to trash.
Status move_to_trash(const std::string& tablet_path);

static Status delete_tablet_parent_path_if_empty(const std::string& tablet_path);

private:
Status _init_cluster_id();
Status _init_capacity_and_create_shards();
Expand All @@ -174,6 +176,8 @@ class DataDir {

void _remove_check_paths(const std::set<std::string>& paths);

void _perform_tablet_gc(const std::string& tablet_schema_hash_path, int16_t shard_name);

bool _check_pending_ids(const std::string& id);

void _perform_path_gc_by_tablet();
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,8 @@ Status StorageEngine::_do_sweep(const std::string& scan_root, const time_t& loca
string path_name = sorted_path.string();
if (difftime(local_now, mktime(&local_tm_create)) >= actual_expire) {
res = io::global_local_filesystem()->delete_directory(path_name);
LOG(INFO) << "do sweep delete directory " << path_name << " local_now " << local_now
<< "actual_expire " << actual_expire << " res " << res;
if (!res.ok()) {
continue;
}
Expand Down
120 changes: 109 additions & 11 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "runtime/memory/mem_tracker.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
#include "util/histogram.h"
#include "util/metrics.h"
Expand Down Expand Up @@ -537,19 +538,24 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId repl
<< ", is_drop_table_or_partition=" << is_drop_table_or_partition;
DorisMetrics::instance()->drop_tablet_requests_total->increment(1);

RETURN_IF_ERROR(register_transition_tablet(tablet_id, "drop tablet"));
Defer defer {[&]() { unregister_transition_tablet(tablet_id, "drop tablet"); }};

// Fetch tablet which need to be dropped
TabletSharedPtr to_drop_tablet = _get_tablet_unlocked(tablet_id);
if (to_drop_tablet == nullptr) {
LOG(WARNING) << "fail to drop tablet because it does not exist. "
<< "tablet_id=" << tablet_id;
return Status::OK();
}

// We should compare replica id to avoid dropping new cloned tablet.
// Iff request replica id is 0, FE may be an older release, then we drop this tablet as before.
if (to_drop_tablet->replica_id() != replica_id && replica_id != 0) {
return Status::Aborted("replica_id not match({} vs {})", to_drop_tablet->replica_id(),
replica_id);
}

_remove_tablet_from_partition(to_drop_tablet);
tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
tablet_map.erase(tablet_id);
Expand Down Expand Up @@ -1057,6 +1063,7 @@ Status TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>
}

Status TabletManager::start_trash_sweep() {
DBUG_EXECUTE_IF("TabletManager.start_trash_sweep.sleep", DBUG_BLOCK);
std::unique_lock<std::mutex> lock(_gc_tablets_lock, std::defer_lock);
if (!lock.try_lock()) {
return Status::OK();
Expand Down Expand Up @@ -1130,6 +1137,33 @@ Status TabletManager::start_trash_sweep() {
}

bool TabletManager::_move_tablet_to_trash(const TabletSharedPtr& tablet) {
RETURN_IF_ERROR(register_transition_tablet(tablet->tablet_id(), "move to trash"));
Defer defer {[&]() { unregister_transition_tablet(tablet->tablet_id(), "move to trash"); }};

TabletSharedPtr tablet_in_not_shutdown = get_tablet(tablet->tablet_id());
if (tablet_in_not_shutdown) {
TSchemaHash schema_hash_not_shutdown = tablet_in_not_shutdown->schema_hash();
size_t path_hash_not_shutdown = tablet_in_not_shutdown->data_dir()->path_hash();
if (tablet->schema_hash() == schema_hash_not_shutdown &&
tablet->data_dir()->path_hash() == path_hash_not_shutdown) {
tablet->clear_cache();
// shard_id in memory not eq shard_id in shutdown
if (tablet_in_not_shutdown->tablet_path() != tablet->tablet_path()) {
LOG(INFO) << "tablet path not eq shutdown tablet path, move it to trash, tablet_id="
<< tablet_in_not_shutdown->tablet_id()
<< " mem manager tablet path=" << tablet_in_not_shutdown->tablet_path()
<< " shutdown tablet path=" << tablet->tablet_path();
return tablet->data_dir()->move_to_trash(tablet->tablet_path());
} else {
LOG(INFO) << "tablet path eq shutdown tablet path, not move to trash, tablet_id="
<< tablet_in_not_shutdown->tablet_id()
<< " mem manager tablet path=" << tablet_in_not_shutdown->tablet_path()
<< " shutdown tablet path=" << tablet->tablet_path();
return true;
}
}
}

TabletMetaSharedPtr tablet_meta(new TabletMeta());
int64_t get_meta_ts = MonotonicMicros();
Status check_st = TabletMetaManager::get_meta(tablet->data_dir(), tablet->tablet_id(),
Expand Down Expand Up @@ -1197,6 +1231,15 @@ bool TabletManager::_move_tablet_to_trash(const TabletSharedPtr& tablet) {
return false;
}
if (exists) {
if (check_st.is<META_KEY_NOT_FOUND>()) {
LOG(INFO) << "could not find tablet meta in rocksdb, so just delete it path "
<< "tablet_id=" << tablet->tablet_id()
<< ", schema_hash=" << tablet->schema_hash()
<< ", delete tablet_path=" << tablet_path;
RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(tablet_path));
RETURN_IF_ERROR(DataDir::delete_tablet_parent_path_if_empty(tablet_path));
return true;
}
LOG(WARNING) << "errors while load meta from store, skip this tablet. "
<< "tablet_id=" << tablet->tablet_id()
<< ", schema_hash=" << tablet->schema_hash();
Expand All @@ -1211,21 +1254,68 @@ bool TabletManager::_move_tablet_to_trash(const TabletSharedPtr& tablet) {
}
}

bool TabletManager::register_clone_tablet(int64_t tablet_id) {
Status TabletManager::register_transition_tablet(int64_t tablet_id, std::string reason) {
tablets_shard& shard = _get_tablets_shard(tablet_id);
std::lock_guard<std::shared_mutex> wrlock(shard.lock);
return shard.tablets_under_clone.insert(tablet_id).second;
std::thread::id thread_id = std::this_thread::get_id();
std::lock_guard<std::mutex> lk(shard.lock_for_transition);
if (auto search = shard.tablets_under_transition.find(tablet_id);
search == shard.tablets_under_transition.end()) {
// not found
shard.tablets_under_transition[tablet_id] = std::make_tuple(reason, thread_id, 1);
LOG(INFO) << "add tablet_id= " << tablet_id << " to map, reason=" << reason
<< " lock times=1 thread_id_in_map=" << thread_id;
return Status::OK();
} else {
// found
auto& [r, thread_id_in_map, lock_times] = search->second;
if (thread_id != thread_id_in_map) {
// other thread, failed
LOG(INFO) << "tablet_id = " << tablet_id << " is doing " << r
<< " thread_id_in_map=" << thread_id_in_map << " , add reason=" << reason
<< " thread_id=" << thread_id;
return Status::InternalError<false>("{} failed try later, tablet_id={}", reason,
tablet_id);
}
// add lock times
++lock_times;
LOG(INFO) << "add tablet_id= " << tablet_id << " to map, reason=" << reason
<< " lock times=" << lock_times << " thread_id_in_map=" << thread_id_in_map;
return Status::OK();
}
}

void TabletManager::unregister_clone_tablet(int64_t tablet_id) {
void TabletManager::unregister_transition_tablet(int64_t tablet_id, std::string reason) {
tablets_shard& shard = _get_tablets_shard(tablet_id);
std::lock_guard<std::shared_mutex> wrlock(shard.lock);
shard.tablets_under_clone.erase(tablet_id);
std::thread::id thread_id = std::this_thread::get_id();
std::lock_guard<std::mutex> lk(shard.lock_for_transition);
if (auto search = shard.tablets_under_transition.find(tablet_id);
search == shard.tablets_under_transition.end()) {
// impossible, bug
DCHECK(false) << "tablet " << tablet_id
<< " must be found, before unreg must have been reg";
} else {
auto& [r, thread_id_in_map, lock_times] = search->second;
if (thread_id_in_map != thread_id) {
// impossible, bug
DCHECK(false) << "tablet " << tablet_id << " unreg thread must same reg thread";
}
// sub lock times
--lock_times;
if (lock_times != 0) {
LOG(INFO) << "erase tablet_id= " << tablet_id << " from map, reason=" << reason
<< " left=" << lock_times << " thread_id_in_map=" << thread_id_in_map;
} else {
LOG(INFO) << "erase tablet_id= " << tablet_id << " from map, reason=" << reason
<< " thread_id_in_map=" << thread_id_in_map;
shard.tablets_under_transition.erase(tablet_id);
}
}
}

void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id,
SchemaHash schema_hash,
const string& schema_hash_path) {
const string& schema_hash_path,
int16_t shard_id) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
// acquire the read lock, so that there is no creating tablet or load tablet from meta tasks
// create tablet and load tablet task should check whether the dir exists
Expand All @@ -1235,13 +1325,21 @@ void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId t
// check if meta already exists
TabletMetaSharedPtr tablet_meta(new TabletMeta());
Status check_st = TabletMetaManager::get_meta(data_dir, tablet_id, schema_hash, tablet_meta);
if (check_st.ok()) {
LOG(INFO) << "tablet meta exists in meta store, skip delete the path " << schema_hash_path;
if (check_st.ok() && tablet_meta->shard_id() == shard_id) {
return;
}

if (shard.tablets_under_clone.count(tablet_id) > 0) {
LOG(INFO) << "tablet is under clone, skip delete the path " << schema_hash_path;
LOG(INFO) << "tablet meta not exists, try delete tablet path " << schema_hash_path;

bool succ = register_transition_tablet(tablet_id, "path gc");
if (!succ) {
return;
}
Defer defer {[&]() { unregister_transition_tablet(tablet_id, "path gc"); }};

TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id);
if (tablet != nullptr && tablet->tablet_path() == schema_hash_path) {
LOG(INFO) << "tablet , skip delete the path " << schema_hash_path;
return;
}

Expand Down
16 changes: 10 additions & 6 deletions be/src/olap/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ class TabletManager {
Status start_trash_sweep();

void try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id,
SchemaHash schema_hash, const std::string& schema_hash_path);
SchemaHash schema_hash, const std::string& schema_hash_path,
int16_t shard_id);

void update_root_path_info(std::map<std::string, DataDirInfo>* path_map,
size_t* tablet_counter);
Expand All @@ -152,8 +153,8 @@ class TabletManager {
void obtain_specific_quantity_tablets(std::vector<TabletInfo>& tablets_info, int64_t num);

// return `true` if register success
bool register_clone_tablet(int64_t tablet_id);
void unregister_clone_tablet(int64_t tablet_id);
Status register_transition_tablet(int64_t tablet_id, std::string reason);
void unregister_transition_tablet(int64_t tablet_id, std::string reason);

void get_tablets_distribution_on_different_disks(
std::map<int64_t, std::map<DataDir*, int64_t>>& tablets_num_on_disk,
Expand Down Expand Up @@ -220,12 +221,15 @@ class TabletManager {
tablets_shard() = default;
tablets_shard(tablets_shard&& shard) {
tablet_map = std::move(shard.tablet_map);
tablets_under_clone = std::move(shard.tablets_under_clone);
tablets_under_transition = std::move(shard.tablets_under_transition);
}
// protect tablet_map, tablets_under_clone and tablets_under_restore
mutable std::shared_mutex lock;
tablet_map_t tablet_map;
std::set<int64_t> tablets_under_clone;
std::mutex lock_for_transition;
// tablet do clone, path gc, move to trash, disk migrate will record in tablets_under_transition
// tablet <reason, thread_id, lock_times>
std::map<int64_t, std::tuple<std::string, std::thread::id, int64_t>>
tablets_under_transition;
};

// trace the memory use by meta of tablet
Expand Down
Loading
Loading