Skip to content

Commit

Permalink
edit
Browse files Browse the repository at this point in the history
  • Loading branch information
hust-hhb committed Sep 13, 2024
1 parent 5b55e56 commit 2312f3f
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 28 deletions.
10 changes: 5 additions & 5 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();
// update delete bitmap info, in order to avoid recalculation when trying again
_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE);
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE));

if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update &&
rowset_writer->num_rows() > 0) {
Expand All @@ -684,9 +684,9 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
// delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
_engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id, tablet_id(), delete_bitmap,
cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info);
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info));

return Status::OK();
}
Expand Down
37 changes: 18 additions & 19 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ void CloudTxnDeleteBitmapCache::set_tablet_txn_info(
int64_t txn_expiration_min =
duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
.count() +
config::remove_expired_tablet_txn_info_interval_seconds;
config::tablet_txn_info_min_expired_seconds;
txn_expiration = std::max(txn_expiration_min, txn_expiration);
{
std::unique_lock<std::shared_mutex> wlock(_rwlock);
Expand Down Expand Up @@ -153,7 +153,7 @@ void CloudTxnDeleteBitmapCache::set_tablet_txn_info(
.tag("delete_bitmap_size", charge);
}

void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transaction_id,
Status CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transaction_id,
int64_t tablet_id,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids,
Expand All @@ -162,7 +162,12 @@ void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transactio
{
std::unique_lock<std::shared_mutex> wlock(_rwlock);
TxnKey txn_key(transaction_id, tablet_id);
CHECK(_txn_map.contains(txn_key));
if (!_txn_map.contains(txn_key)) {
return Status::Error<ErrorCode::NOT_FOUND, false>(
"not found txn info, tablet_id={}, transaction_id={}, may be expired and be "
"removed",
tablet_id, transaction_id);
}
TxnVal& txn_val = _txn_map[txn_key];
*(txn_val.publish_status) = publish_status;
if (publish_status == PublishStatus::SUCCEED) {
Expand All @@ -186,32 +191,26 @@ void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transactio
.tag("tablt_id", tablet_id)
.tag("delete_bitmap_size", charge)
.tag("publish_status", static_cast<int>(publish_status));
return Status::OK();
}

void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
TEST_SYNC_POINT_RETURN_WITH_VOID("CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info");
std::unique_lock<std::shared_mutex> wlock(_rwlock);
for (auto it = _expiration_txn.begin(); it != _expiration_txn.end();) {
if (_txn_map.find(it->second) == _txn_map.end()) {
it = _expiration_txn.erase(it);
while (!_expiration_txn.empty()) {
auto iter = _expiration_txn.begin();
if (_txn_map.find(iter->second) == _txn_map.end()) {
_expiration_txn.erase(iter);
continue;
}
int64_t current_time = duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
if (it->first > current_time) {
if (iter->first > current_time) {
break;
}
if (*(_txn_map.find(it->second)->second.publish_status) != PublishStatus::SUCCEED) {
LOG(WARNING) << "process expired_tablet_txn_info, txn="
<< _txn_map.find(it->second)->first.txn_id << " publish_status is "
<< static_cast<int>(*(_txn_map.find(it->second)->second.publish_status))
<< ",skip to remove";
it++;
continue;
}
auto txn_iter = _txn_map.find(it->second);
if ((txn_iter != _txn_map.end()) && (it->first == txn_iter->second.txn_expiration)) {
auto txn_iter = _txn_map.find(iter->second);
if ((txn_iter != _txn_map.end()) && (iter->first == txn_iter->second.txn_expiration)) {
LOG_INFO("clean expired delete bitmap")
.tag("txn_id", txn_iter->first.txn_id)
.tag("expiration", txn_iter->second.txn_expiration)
Expand All @@ -220,9 +219,9 @@ void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
std::to_string(txn_iter->first.tablet_id); // Cache key container
CacheKey cache_key(key_str);
erase(cache_key);
_txn_map.erase(it->second);
_txn_map.erase(iter->second);
}
it = _expiration_txn.erase(it);
_expiration_txn.erase(iter);
}
}

Expand Down
8 changes: 4 additions & 4 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual {
RowsetSharedPtr rowset, int64_t txn_expirationm,
std::shared_ptr<PartialUpdateInfo> partial_update_info);

void update_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids,
PublishStatus publish_status, TxnPublishInfo publish_info = {});
Status update_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids,
PublishStatus publish_status, TxnPublishInfo publish_info = {});

void remove_expired_tablet_txn_info();

Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ DEFINE_Bool(enable_cloud_txn_lazy_commit, "false");

DEFINE_mInt32(remove_expired_tablet_txn_info_interval_seconds, "300");

DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120");

void set_cloud_unique_id(std::string instance_id) {
if (cloud_unique_id.empty() && !instance_id.empty()) {
static_cast<void>(set_config("cloud_unique_id", "1:" + instance_id + ":compute", true));
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,6 @@ DECLARE_mBool(enable_cloud_txn_lazy_commit);

DECLARE_mInt32(remove_expired_tablet_txn_info_interval_seconds);

DECLARE_mInt32(tablet_txn_info_min_expired_seconds);

} // namespace doris::config

0 comments on commit 2312f3f

Please sign in to comment.