Skip to content

Commit

Permalink
[fix](cloud-mow) check cloud_txn_delete_bitmap_cache's status before …
Browse files Browse the repository at this point in the history
…removing expired one
  • Loading branch information
hust-hhb committed Sep 13, 2024
1 parent 55d6d64 commit 5b55e56
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 17 deletions.
43 changes: 26 additions & 17 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <memory>
#include <shared_mutex>

#include "cloud/config.h"
#include "common/status.h"
#include "cpp/sync_point.h"
#include "olap/olap_common.h"
Expand Down Expand Up @@ -119,12 +120,11 @@ void CloudTxnDeleteBitmapCache::set_tablet_txn_info(
TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids, RowsetSharedPtr rowset, int64_t txn_expiration,
std::shared_ptr<PartialUpdateInfo> partial_update_info) {
if (txn_expiration <= 0) {
txn_expiration = duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch())
.count() +
120;
}
int64_t txn_expiration_min =
duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
.count() +
config::remove_expired_tablet_txn_info_interval_seconds;
txn_expiration = std::max(txn_expiration_min, txn_expiration);
{
std::unique_lock<std::shared_mutex> wlock(_rwlock);
TxnKey txn_key(transaction_id, tablet_id);
Expand Down Expand Up @@ -184,26 +184,34 @@ void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transactio
LOG_INFO("update txn related delete bitmap")
.tag("txn_id", transaction_id)
.tag("tablt_id", tablet_id)
.tag("delete_bitmap_size", charge);
.tag("delete_bitmap_size", charge)
.tag("publish_status", static_cast<int>(publish_status));
}

void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
TEST_SYNC_POINT_RETURN_WITH_VOID("CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info");
std::unique_lock<std::shared_mutex> wlock(_rwlock);
while (!_expiration_txn.empty()) {
auto iter = _expiration_txn.begin();
if (_txn_map.find(iter->second) == _txn_map.end()) {
_expiration_txn.erase(iter);
for (auto it = _expiration_txn.begin(); it != _expiration_txn.end();) {
if (_txn_map.find(it->second) == _txn_map.end()) {
it = _expiration_txn.erase(it);
continue;
}
int64_t current_time = duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
if (iter->first > current_time) {
if (it->first > current_time) {
break;
}
auto txn_iter = _txn_map.find(iter->second);
if ((txn_iter != _txn_map.end()) && (iter->first == txn_iter->second.txn_expiration)) {
if (*(_txn_map.find(it->second)->second.publish_status) != PublishStatus::SUCCEED) {
LOG(WARNING) << "process expired_tablet_txn_info, txn="
<< _txn_map.find(it->second)->first.txn_id << " publish_status is "
<< static_cast<int>(*(_txn_map.find(it->second)->second.publish_status))
<< ",skip to remove";
it++;
continue;
}
auto txn_iter = _txn_map.find(it->second);
if ((txn_iter != _txn_map.end()) && (it->first == txn_iter->second.txn_expiration)) {
LOG_INFO("clean expired delete bitmap")
.tag("txn_id", txn_iter->first.txn_id)
.tag("expiration", txn_iter->second.txn_expiration)
Expand All @@ -212,9 +220,9 @@ void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
std::to_string(txn_iter->first.tablet_id); // Cache key container
CacheKey cache_key(key_str);
erase(cache_key);
_txn_map.erase(iter->second);
_txn_map.erase(it->second);
}
_expiration_txn.erase(iter);
it = _expiration_txn.erase(it);
}
}

Expand All @@ -238,7 +246,8 @@ void CloudTxnDeleteBitmapCache::remove_unused_tablet_txn_info(TTransactionId tra
void CloudTxnDeleteBitmapCache::_clean_thread_callback() {
do {
remove_expired_tablet_txn_info();
} while (!_stop_latch.wait_for(std::chrono::seconds(300)));
} while (!_stop_latch.wait_for(
std::chrono::seconds(config::remove_expired_tablet_txn_info_interval_seconds)));
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ DEFINE_mBool(enable_new_tablet_do_compaction, "false");

DEFINE_Bool(enable_cloud_txn_lazy_commit, "false");

DEFINE_mInt32(remove_expired_tablet_txn_info_interval_seconds, "300");

void set_cloud_unique_id(std::string instance_id) {
if (cloud_unique_id.empty() && !instance_id.empty()) {
static_cast<void>(set_config("cloud_unique_id", "1:" + instance_id + ":compute", true));
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,6 @@ DECLARE_mInt32(sync_load_for_tablets_thread);
// enable large txn lazy commit in meta-service `commit_txn`
DECLARE_mBool(enable_cloud_txn_lazy_commit);

DECLARE_mInt32(remove_expired_tablet_txn_info_interval_seconds);

} // namespace doris::config

0 comments on commit 5b55e56

Please sign in to comment.