Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](clone) Fix clone and alter tablet use same tablet path #34889

Merged
merged 12 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,8 @@ Status StorageEngine::_do_sweep(const std::string& scan_root, const time_t& loca
string path_name = sorted_path.string();
if (difftime(local_now, mktime(&local_tm_create)) >= actual_expire) {
res = io::global_local_filesystem()->delete_directory(path_name);
LOG(INFO) << "do sweep delete directory " << path_name << " local_now " << local_now
<< "actual_expire " << actual_expire << " res " << res;
if (!res.ok()) {
continue;
}
Expand Down
89 changes: 72 additions & 17 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "runtime/memory/mem_tracker.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
#include "util/histogram.h"
#include "util/metrics.h"
Expand Down Expand Up @@ -522,9 +523,6 @@ Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id,
bool is_drop_table_or_partition) {
auto& shard = _get_tablets_shard(tablet_id);
std::lock_guard wrlock(shard.lock);
if (shard.tablets_under_clone.count(tablet_id) > 0) {
return Status::Aborted("tablet {} is under clone, skip drop task", tablet_id);
}
return _drop_tablet_unlocked(tablet_id, replica_id, false, is_drop_table_or_partition);
}

Expand All @@ -542,6 +540,22 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId repl
<< "tablet_id=" << tablet_id;
return Status::OK();
}

TSchemaHash schema_hash = to_drop_tablet->schema_hash();
deardeng marked this conversation as resolved.
Show resolved Hide resolved
size_t path_hash = to_drop_tablet->data_dir()->path_hash();

tablets_shard& shard = _get_tablets_shard(tablet_id);
auto [it, can_do] = shard.tablets_under_transition.insert(std::pair(TabletSchemaPath(tablet_id, schema_hash, path_hash), "drop tablet"));
if (!can_do) {
LOG(INFO) << "other op " << it->second << " effect this tablet_id = " << tablet_id << " schema_hash=" << schema_hash << " pash_hash=" << path_hash;
return Status::InternalError<false>("drop tablet failed try later, tablet_id={}, schema_hash={}, path_hash={}", tablet_id, schema_hash, path_hash);
}
LOG(INFO) << "add tablet_id= " << tablet_id << " schema_hash=" << schema_hash << " path_hash=" << path_hash << " drop tablet to map";
Defer defer {[&]() {
LOG(INFO) << "erase tablet_id= " << tablet_id << " schema_hash=" << schema_hash << " path_hash=" << path_hash << " drop tablet from map";
shard.tablets_under_transition.erase(TabletSchemaPath(tablet_id, schema_hash, path_hash));
}};

// We should compare replica id to avoid dropping new cloned tablet.
// Iff request replica id is 0, FE may be an older release, then we drop this tablet as before.
if (to_drop_tablet->replica_id() != replica_id && replica_id != 0) {
Expand Down Expand Up @@ -1068,6 +1082,7 @@ void TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>*
}

Status TabletManager::start_trash_sweep() {
DBUG_EXECUTE_IF("TabletManager.start_trash_sweep.sleep", DBUG_BLOCK);
std::unique_lock<std::mutex> lock(_gc_tablets_lock, std::defer_lock);
if (!lock.try_lock()) {
return Status::OK();
Expand All @@ -1076,26 +1091,20 @@ Status TabletManager::start_trash_sweep() {
for_each_tablet([](const TabletSharedPtr& tablet) { tablet->delete_expired_stale_rowset(); },
filter_all_tablets);

std::list<TabletSharedPtr>::iterator last_it;
{
std::shared_lock rdlock(_shutdown_tablets_lock);
last_it = _shutdown_tablets.begin();
if (last_it == _shutdown_tablets.end()) {
return Status::OK();
}
}
std::list<TabletSharedPtr> stage_tablets;

auto get_batch_tablets = [this, &last_it](int limit) {
auto get_batch_tablets = [this, &stage_tablets](int limit) {
std::vector<TabletSharedPtr> batch_tablets;
std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock);
while (last_it != _shutdown_tablets.end() && batch_tablets.size() < limit) {
auto it = _shutdown_tablets.begin();
while (it != _shutdown_tablets.end() && batch_tablets.size() < limit) {
// it means current tablet is referenced by other thread
if (last_it->use_count() > 1) {
last_it++;
if (it->use_count() > 1) {
stage_tablets.push_back(*it);
} else {
batch_tablets.push_back(*last_it);
last_it = _shutdown_tablets.erase(last_it);
batch_tablets.push_back(*it);
}
it = _shutdown_tablets.erase(it);
}

return batch_tablets;
Expand Down Expand Up @@ -1131,6 +1140,11 @@ Status TabletManager::start_trash_sweep() {
#endif
}

if (!stage_tablets.empty()) {
std::lock_guard<std::shared_mutex> wrlock(_shutdown_tablets_lock);
_shutdown_tablets.splice(_shutdown_tablets.end(), stage_tablets);
}

if (!failed_tablets.empty()) {
std::lock_guard<std::shared_mutex> wrlock(_shutdown_tablets_lock);
_shutdown_tablets.splice(_shutdown_tablets.end(), failed_tablets);
Expand All @@ -1140,6 +1154,15 @@ Status TabletManager::start_trash_sweep() {
}

bool TabletManager::_move_tablet_to_trash(const TabletSharedPtr& tablet) {
TabletSharedPtr tablet_in_not_shutdown = get_tablet(tablet->tablet_id());
if (tablet_in_not_shutdown) {
TSchemaHash schema_hash_not_shutdown = tablet_in_not_shutdown->schema_hash();
size_t path_hash_not_shutdown = tablet_in_not_shutdown->data_dir()->path_hash();
if (tablet->schema_hash() == schema_hash_not_shutdown && tablet->data_dir()->path_hash() == path_hash_not_shutdown) {
tablet->clear_cache();
deardeng marked this conversation as resolved.
Show resolved Hide resolved
}
return true;
deardeng marked this conversation as resolved.
Show resolved Hide resolved
}
TabletMetaSharedPtr tablet_meta(new TabletMeta());
int64_t get_meta_ts = MonotonicMicros();
Status check_st = TabletMetaManager::get_meta(tablet->data_dir(), tablet->tablet_id(),
Expand Down Expand Up @@ -1207,6 +1230,14 @@ bool TabletManager::_move_tablet_to_trash(const TabletSharedPtr& tablet) {
return false;
}
if (exists) {
if (check_st.is<META_KEY_NOT_FOUND>()) {
LOG(INFO) << "could not find tablet meta in rocksdb, so just delete it path "
<< "tablet_id=" << tablet->tablet_id()
<< ", schema_hash=" << tablet->schema_hash()
<< ", delete tablet_path=" << tablet_path;
RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(tablet_path));
deardeng marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
LOG(WARNING) << "errors while load meta from store, skip this tablet. "
<< "tablet_id=" << tablet->tablet_id()
<< ", schema_hash=" << tablet->schema_hash();
Expand All @@ -1233,6 +1264,18 @@ void TabletManager::unregister_clone_tablet(int64_t tablet_id) {
shard.tablets_under_clone.erase(tablet_id);
}

std::pair<std::map<doris::TabletSchemaPath, std::string>::iterator, bool> TabletManager::register_transition_tablet(int64_t tablet_id, TSchemaHash schema_hash, size_t path_hash, std::string reason) {
deardeng marked this conversation as resolved.
Show resolved Hide resolved
tablets_shard& shard = _get_tablets_shard(tablet_id);
std::lock_guard<std::shared_mutex> wrlock(shard.lock);
return shard.tablets_under_transition.insert(std::pair(TabletSchemaPath(tablet_id, schema_hash, path_hash), reason));
}

void TabletManager::unregister_transition_tablet(int64_t tablet_id, TSchemaHash schema_hash, size_t path_hash) {
tablets_shard& shard = _get_tablets_shard(tablet_id);
std::lock_guard<std::shared_mutex> wrlock(shard.lock);
shard.tablets_under_transition.erase(TabletSchemaPath(tablet_id, schema_hash, path_hash));
}

void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id,
SchemaHash schema_hash,
const string& schema_hash_path) {
Expand All @@ -1249,6 +1292,18 @@ void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId t
return;
}

size_t path_hash = data_dir->path_hash();
auto [it, can_do] = shard.tablets_under_transition.insert(std::pair(TabletSchemaPath(tablet_id, schema_hash, path_hash), "path gc"));
if (!can_do) {
LOG(INFO) << "other op " << it->second << " effect this tablet_id = " << tablet_id << " schema_hash=" << schema_hash << " pash_hash=" << path_hash;
return;
}
LOG(INFO) << "add tablet_id= " << tablet_id << " schema_hash=" << schema_hash << " path_hash=" << path_hash << " path gc tablet to map";
Defer defer {[&]() {
LOG(INFO) << "erase tablet_id= " << tablet_id << " schema_hash=" << schema_hash << " path_hash=" << path_hash << " path gc tablet from map";
shard.tablets_under_transition.erase(TabletSchemaPath(tablet_id, schema_hash, path_hash));
}};

if (shard.tablets_under_clone.count(tablet_id) > 0) {
LOG(INFO) << "tablet is under clone, skip delete the path " << schema_hash_path;
return;
Expand Down
43 changes: 43 additions & 0 deletions be/src/olap/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,39 @@ class TCreateTabletReq;
class TTablet;
class TTabletInfo;

class TabletSchemaPath {
private:
TTabletId tablet_id;
TSchemaHash schema_hash;
size_t path_hash;
public:
TabletSchemaPath() = default;
TabletSchemaPath(TTabletId tablet_id, TSchemaHash schema_hash, size_t path_hash) : tablet_id(tablet_id), schema_hash(schema_hash), path_hash(path_hash) {}
TabletSchemaPath(TabletSchemaPath&& scp) {
tablet_id = std::move(scp.tablet_id);
schema_hash = std::move(scp.schema_hash);
path_hash = std::move(scp.path_hash);
}

bool operator==(const TabletSchemaPath& other) const {
return tablet_id == other.tablet_id && schema_hash == other.schema_hash && path_hash == other.path_hash;
}

bool operator<(const TabletSchemaPath& other) const {
if (tablet_id < other.tablet_id) {
return true;
} else if (tablet_id > other.tablet_id) {
return false;
} else if (schema_hash < other.schema_hash) {
return true;
} else if (schema_hash > other.schema_hash) {
return false;
} else {
return path_hash < other.path_hash;
}
}
};

// TabletManager provides get, add, delete tablet method for storage engine
// NOTE: If you want to add a method that needs to hold meta-lock before you can call it,
// please uniformly name the method in "xxx_unlocked()" mode
Expand Down Expand Up @@ -162,6 +195,13 @@ class TabletManager {
bool register_clone_tablet(int64_t tablet_id);
void unregister_clone_tablet(int64_t tablet_id);

// return `true` if register success
std::pair<std::map<TabletSchemaPath, std::string>::iterator, bool> register_transition_tablet(int64_t tablet_id, TSchemaHash schema_hash, size_t path_hash, std::string reason);
void unregister_transition_tablet(int64_t tablet_id, TSchemaHash schema_hash, size_t path_hash);

std::pair<std::map<TabletSchemaPath, std::string>::iterator, bool> register_transition_tablet_nolock(int64_t tablet_id, TSchemaHash schema_hash, size_t path_hash, std::string reason);
void unregister_transition_tablet_nolock(int64_t tablet_id, TSchemaHash schema_hash, size_t path_hash);

void get_tablets_distribution_on_different_disks(
std::map<int64_t, std::map<DataDir*, int64_t>>& tablets_num_on_disk,
std::map<int64_t, std::map<DataDir*, std::vector<TabletSize>>>& tablets_info_on_disk);
Expand Down Expand Up @@ -232,11 +272,14 @@ class TabletManager {
tablets_shard(tablets_shard&& shard) {
tablet_map = std::move(shard.tablet_map);
tablets_under_clone = std::move(shard.tablets_under_clone);
tablets_under_transition = std::move(shard.tablets_under_transition);
}
// protect tablet_map, tablets_under_clone and tablets_under_restore
mutable std::shared_mutex lock;
tablet_map_t tablet_map;
std::set<int64_t> tablets_under_clone;
// tablet do clone, path gc, move to trash, disk migrate will record in tablets_under_transition
deardeng marked this conversation as resolved.
Show resolved Hide resolved
std::map<TabletSchemaPath, std::string> tablets_under_transition;
};

struct Partition {
Expand Down
14 changes: 14 additions & 0 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,20 @@ Status EngineCloneTask::_do_clone() {
// Check local tablet exist or not
TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(_clone_req.tablet_id);

TSchemaHash schema_hash = tablet->schema_hash();
size_t path_hash = tablet->data_dir()->path_hash();

auto [it, can_do]= _engine.tablet_manager()->register_transition_tablet(_clone_req.tablet_id, schema_hash, path_hash, "clone");
if (!can_do) {
LOG(INFO) << "other op " << it->second << " effect this tablet_id = " << _clone_req.tablet_id << " schema_hash=" << schema_hash << " pash_hash=" << path_hash;
return Status::InternalError<false>("clone tablet failed try later, tablet_id={}, schema_hash={}, path_hash={}", _clone_req.tablet_id , schema_hash, path_hash);
}
LOG(INFO) << "add tablet_id= " << _clone_req.tablet_id << " schema_hash=" << schema_hash << " path_hash=" << path_hash << " clone tablet to map";
Defer defer {[&]() {
LOG(INFO) << "erase tablet_id= " << _clone_req.tablet_id << " schema_hash=" << schema_hash << " path_hash=" << path_hash << " clone tablet from map";
_engine.tablet_manager()->unregister_transition_tablet(_clone_req.tablet_id, schema_hash, path_hash);
}};

// The status of a tablet is not ready, indicating that it is a residual tablet after a schema
// change failure. Clone a new tablet from remote be to overwrite it. This situation basically only
// occurs when the be_rebalancer_fuzzy_test configuration is enabled.
Expand Down
15 changes: 15 additions & 0 deletions be/src/olap/task/engine_storage_migration_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,21 @@ Status EngineStorageMigrationTask::_migrate() {
int64_t tablet_id = _tablet->tablet_id();
LOG(INFO) << "begin to process tablet migrate. "
<< "tablet_id=" << tablet_id << ", dest_store=" << _dest_store->path();

TSchemaHash schema_hash = _tablet->schema_hash();
size_t path_hash = _tablet->data_dir()->path_hash();

auto [it, can_do] = _engine.tablet_manager()->register_transition_tablet(_tablet->tablet_id(), schema_hash, path_hash, "disk migrate");
if (!can_do) {
LOG(INFO) << "other op " << it->second << " effect this tablet_id = " << tablet_id << " schema_hash=" << schema_hash << " pash_hash=" << path_hash;
return Status::InternalError<false>("disk migrate tablet failed try later, tablet_id={}, schema_hash={}, path_hash={}", tablet_id, schema_hash, path_hash);
}
LOG(INFO) << "add tablet_id= " << tablet_id << " schema_hash=" << schema_hash << " path_hash=" << path_hash << " disk migrate tablet to map";
Defer defer {[&]() {
LOG(INFO) << "erase tablet_id= " << tablet_id << " schema_hash=" << schema_hash << " path_hash=" << path_hash << " disk migrate tablet from map";
_engine.tablet_manager()->unregister_transition_tablet(_tablet->tablet_id(), schema_hash, path_hash);
}};


DorisMetrics::instance()->storage_migrate_requests_total->increment(1);
int32_t start_version = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.doris.regression.suite.ClusterOptions
import org.junit.Assert

suite('test_drop_clone_tablet_path_race') {
if (isCloudMode()) {
return
}
def options = new ClusterOptions()
options.enableDebugPoints()
options.feConfigs += [
'tablet_checker_interval_ms=100',
'schedule_slot_num_per_hdd_path=1000',
'storage_high_watermark_usage_percent=99',
'storage_flood_stage_usage_percent=99',
]
options.beNum = 3
docker(options) {
def table = "t1"
def checkFunc = {size ->
boolean succ = false
for (int i = 0; i < 120; i++) {
def result = sql_return_maparray """SHOW TABLETS FROM ${table}"""
if (result.size() == size) {
def version = result[0].Version
def state = result[0].State
succ = result.every { it.Version.equals(version) && it.State.equals(state) }
if (succ) {
break
}
}
sleep(1000)
}
Assert.assertTrue(succ)
}

sql """DROP TABLE IF EXISTS ${table}"""
sql """
CREATE TABLE `${table}` (
`id` int(11) NULL,
`name` varchar(255) NULL,
`score` int(11) SUM NULL
) ENGINE=OLAP
AGGREGATE KEY(`id`, `name`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
'replication_num' = '3'
);
"""

try {
// 10h
GetDebugPoint().enableDebugPointForAllBEs("TabletManager.start_trash_sweep.sleep")
for(int i= 0; i < 100; ++i) {
sql """INSERT INTO ${table} values (${i}, "${i}str", ${i} * 100)"""
}

sql """ALTER TABLE ${table} MODIFY PARTITION(${table}) SET ("replication_num" = "2")"""

checkFunc(20)

sql """ALTER TABLE ${table} MODIFY PARTITION(${table}) SET ("replication_num" = "3")"""
checkFunc(30)
} finally {
GetDebugPoint().disableDebugPointForAllBEs("TabletManager.start_trash_sweep.sleep")
}
}
}
Loading