Skip to content

Commit

Permalink
[branch-2.0](migrate disk) fix migrate disk lost data during publish …
Browse files Browse the repository at this point in the history
…version #29887 (#30546)
  • Loading branch information
yujun777 authored Jan 31, 2024
1 parent 3468b32 commit beeac85
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 18 deletions.
2 changes: 1 addition & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ void TaskWorkerPool::_finish_task(const TFinishTaskRequest& finish_task_request)
.error(result.status);
try_time += 1;
}
sleep(config::sleep_one_second);
sleep(1);
}
}

Expand Down
2 changes: 0 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,6 @@ DEFINE_mInt32(max_download_speed_kbps, "50000");
DEFINE_mInt32(download_low_speed_limit_kbps, "50");
// download low speed time(seconds)
DEFINE_mInt32(download_low_speed_time, "300");
// sleep time for one second
DEFINE_Int32(sleep_one_second, "1");

// log dir
DEFINE_String(sys_log_dir, "${DORIS_HOME}/log");
Expand Down
2 changes: 0 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,6 @@ DECLARE_mInt32(max_download_speed_kbps);
DECLARE_mInt32(download_low_speed_limit_kbps);
// download low speed time(seconds)
DECLARE_mInt32(download_low_speed_time);
// sleep time for one second
DECLARE_Int32(sleep_one_second);

// log dir
DECLARE_String(sys_log_dir);
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class Tablet : public BaseTablet {
std::mutex& get_base_compaction_lock() { return _base_compaction_lock; }
std::mutex& get_cumulative_compaction_lock() { return _cumulative_compaction_lock; }

std::shared_mutex& get_migration_lock() { return _migration_lock; }
std::shared_timed_mutex& get_migration_lock() { return _migration_lock; }

std::mutex& get_schema_change_lock() { return _schema_change_lock; }

Expand Down Expand Up @@ -635,7 +635,7 @@ class Tablet : public BaseTablet {
std::mutex _base_compaction_lock;
std::mutex _cumulative_compaction_lock;
std::mutex _schema_change_lock;
std::shared_mutex _migration_lock;
std::shared_timed_mutex _migration_lock;
std::mutex _build_inverted_index_lock;

// TODO(lingbin): There is a _meta_lock TabletMeta too, there should be a comment to
Expand Down
17 changes: 17 additions & 0 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,13 @@ TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task
}

void TabletPublishTxnTask::handle() {
std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::chrono::seconds(5));
if (!migration_rlock.owns_lock()) {
_result = Status::Error<TRY_LOCK_FAILED, false>("got migration_rlock failed");
LOG(WARNING) << "failed to publish version. tablet_id=" << _tablet_info.tablet_id
<< ", txn_id=" << _transaction_id << ", res=" << _result;
return;
}
std::unique_lock<std::mutex> rowset_update_lock(_tablet->get_rowset_update_lock(),
std::defer_lock);
if (_tablet->enable_unique_key_merge_on_write()) {
Expand All @@ -352,6 +359,8 @@ void TabletPublishTxnTask::handle() {
return;
}

DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.block_add_rowsets", DBUG_BLOCK);

// add visible rowset to tablet
int64_t t1 = MonotonicMicros();
_result = _tablet->add_inc_rowset(_rowset);
Expand All @@ -377,6 +386,12 @@ void TabletPublishTxnTask::handle() {
}

void AsyncTabletPublishTask::handle() {
std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::chrono::seconds(5));
if (!migration_rlock.owns_lock()) {
LOG(WARNING) << "failed to publish version. tablet_id=" << _tablet->tablet_id()
<< ", txn_id=" << _transaction_id << ", got migration_rlock failed";
return;
}
std::lock_guard<std::mutex> wrlock(_tablet->get_rowset_update_lock());
_stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
Expand All @@ -398,6 +413,8 @@ void AsyncTabletPublishTask::handle() {
return;
}

DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.block_add_rowsets", DBUG_BLOCK);

// add visible rowset to tablet
int64_t t1 = MonotonicMicros();
publish_status = _tablet->add_inc_rowset(rowset);
Expand Down
20 changes: 10 additions & 10 deletions be/src/olap/task/engine_storage_migration_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ namespace doris {

using std::stringstream;

const int CHECK_TXNS_MAX_WAIT_TIME_SECS = 60;

EngineStorageMigrationTask::EngineStorageMigrationTask(const TabletSharedPtr& tablet,
DataDir* dest_store)
: _tablet(tablet), _dest_store(dest_store) {
Expand Down Expand Up @@ -114,25 +112,27 @@ Status EngineStorageMigrationTask::_check_running_txns() {
}

Status EngineStorageMigrationTask::_check_running_txns_until_timeout(
std::unique_lock<std::shared_mutex>* migration_wlock) {
std::unique_lock<std::shared_timed_mutex>* migration_wlock) {
// caller should not hold migration lock, and 'migration_wlock' should not be nullptr
// ownership of the migration_wlock is transferred to the caller if check succ
DCHECK_NE(migration_wlock, nullptr);
Status res = Status::OK();
int try_times = 1;
do {
// to avoid invalid loops, the lock is guaranteed to be acquired here
{
std::unique_lock<std::shared_mutex> wlock(_tablet->get_migration_lock());
std::unique_lock<std::shared_timed_mutex> wlock(_tablet->get_migration_lock());
if (_tablet->tablet_state() == TABLET_SHUTDOWN) {
return Status::Error<ErrorCode::INTERNAL_ERROR, false>("tablet {} has deleted",
_tablet->tablet_id());
}
res = _check_running_txns();
if (res.ok()) {
// transfer the lock to the caller
*migration_wlock = std::move(wlock);
return res;
}
}
sleep(std::min(config::sleep_one_second * try_times, CHECK_TXNS_MAX_WAIT_TIME_SECS));
++try_times;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
} while (!_is_timeout());
return res;
}
Expand Down Expand Up @@ -214,8 +214,8 @@ Status EngineStorageMigrationTask::_migrate() {
uint64_t shard = 0;
std::string full_path;
{
std::unique_lock<std::shared_mutex> migration_wlock(_tablet->get_migration_lock(),
std::try_to_lock);
std::unique_lock<std::shared_timed_mutex> migration_wlock(_tablet->get_migration_lock(),
std::chrono::seconds(1));
if (!migration_wlock.owns_lock()) {
return Status::InternalError("could not own migration_wlock");
}
Expand Down Expand Up @@ -250,7 +250,7 @@ Status EngineStorageMigrationTask::_migrate() {
if (!res.ok()) {
break;
}
std::unique_lock<std::shared_mutex> migration_wlock;
std::unique_lock<std::shared_timed_mutex> migration_wlock;
res = _check_running_txns_until_timeout(&migration_wlock);
if (!res.ok()) {
break;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/task/engine_storage_migration_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class EngineStorageMigrationTask : public EngineTask {
Status _check_running_txns();
// caller should not hold migration lock, and 'migration_wlock' should not be nullptr
// ownership of the migration lock is transferred to the caller if check succ
Status _check_running_txns_until_timeout(std::unique_lock<std::shared_mutex>* migration_wlock);
Status _check_running_txns_until_timeout(
std::unique_lock<std::shared_timed_mutex>* migration_wlock);

// if the size less than threshold, return true
bool _is_rowsets_size_less_than_threshold(
Expand Down
14 changes: 14 additions & 0 deletions be/src/util/debug_points.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

#include <atomic>
#include <boost/lexical_cast.hpp>
#include <chrono>
#include <functional>
#include <map>
#include <memory>
#include <thread>
#include <type_traits>

#include "common/compiler_util.h"
Expand All @@ -33,10 +35,22 @@
if (UNLIKELY(config::enable_debug_points)) { \
auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
if (dp) { \
[[maybe_unused]] auto DP_NAME = debug_point_name; \
code; \
} \
}

// define some common debug actions
// usage example: DBUG_EXECUTE_IF("xxx", DBUG_BLOCK);
#define DBUG_BLOCK \
{ \
LOG(INFO) << "start debug block " << DP_NAME; \
while (DebugPoints::instance()->is_enable(DP_NAME)) { \
std::this_thread::sleep_for(std::chrono::milliseconds(10)); \
} \
LOG(INFO) << "end debug block " << DP_NAME; \
}

namespace doris {

struct DebugPoint {
Expand Down

0 comments on commit beeac85

Please sign in to comment.