Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](cloud-mow) make cloud_txn_delete_bitmap_cache's expired time more reasonable #40333

Merged
merged 3 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();
// update delete bitmap info, in order to avoid recalculation when trying again
_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE);
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE));

if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update &&
rowset_writer->num_rows() > 0) {
Expand All @@ -684,9 +684,9 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
// delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
_engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id, tablet_id(), delete_bitmap,
cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info);
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info));

return Status::OK();
}
Expand Down
38 changes: 23 additions & 15 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <memory>
#include <shared_mutex>

#include "cloud/config.h"
#include "common/status.h"
#include "cpp/sync_point.h"
#include "olap/olap_common.h"
Expand Down Expand Up @@ -119,12 +120,11 @@ void CloudTxnDeleteBitmapCache::set_tablet_txn_info(
TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids, RowsetSharedPtr rowset, int64_t txn_expiration,
std::shared_ptr<PartialUpdateInfo> partial_update_info) {
if (txn_expiration <= 0) {
txn_expiration = duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch())
.count() +
120;
}
int64_t txn_expiration_min =
duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
.count() +
config::tablet_txn_info_min_expired_seconds;
txn_expiration = std::max(txn_expiration_min, txn_expiration);
{
std::unique_lock<std::shared_mutex> wlock(_rwlock);
TxnKey txn_key(transaction_id, tablet_id);
Expand Down Expand Up @@ -153,16 +153,21 @@ void CloudTxnDeleteBitmapCache::set_tablet_txn_info(
.tag("delete_bitmap_size", charge);
}

void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transaction_id,
int64_t tablet_id,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids,
PublishStatus publish_status,
TxnPublishInfo publish_info) {
Status CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transaction_id,
int64_t tablet_id,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids,
PublishStatus publish_status,
TxnPublishInfo publish_info) {
{
std::unique_lock<std::shared_mutex> wlock(_rwlock);
TxnKey txn_key(transaction_id, tablet_id);
CHECK(_txn_map.contains(txn_key));
if (!_txn_map.contains(txn_key)) {
return Status::Error<ErrorCode::NOT_FOUND, false>(
"not found txn info, tablet_id={}, transaction_id={}, may be expired and be "
"removed",
tablet_id, transaction_id);
}
TxnVal& txn_val = _txn_map[txn_key];
*(txn_val.publish_status) = publish_status;
if (publish_status == PublishStatus::SUCCEED) {
Expand All @@ -184,7 +189,9 @@ void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transactio
LOG_INFO("update txn related delete bitmap")
.tag("txn_id", transaction_id)
.tag("tablt_id", tablet_id)
.tag("delete_bitmap_size", charge);
.tag("delete_bitmap_size", charge)
.tag("publish_status", static_cast<int>(publish_status));
return Status::OK();
}

void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
Expand Down Expand Up @@ -238,7 +245,8 @@ void CloudTxnDeleteBitmapCache::remove_unused_tablet_txn_info(TTransactionId tra
void CloudTxnDeleteBitmapCache::_clean_thread_callback() {
do {
remove_expired_tablet_txn_info();
} while (!_stop_latch.wait_for(std::chrono::seconds(300)));
} while (!_stop_latch.wait_for(
std::chrono::seconds(config::remove_expired_tablet_txn_info_interval_seconds)));
}

} // namespace doris
8 changes: 4 additions & 4 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual {
RowsetSharedPtr rowset, int64_t txn_expirationm,
std::shared_ptr<PartialUpdateInfo> partial_update_info);

void update_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids,
PublishStatus publish_status, TxnPublishInfo publish_info = {});
Status update_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids,
PublishStatus publish_status, TxnPublishInfo publish_info = {});

void remove_expired_tablet_txn_info();

Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ DEFINE_mBool(enable_new_tablet_do_compaction, "false");

DEFINE_Bool(enable_cloud_txn_lazy_commit, "false");

DEFINE_mInt32(remove_expired_tablet_txn_info_interval_seconds, "300");

DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120");

void set_cloud_unique_id(std::string instance_id) {
if (cloud_unique_id.empty() && !instance_id.empty()) {
static_cast<void>(set_config("cloud_unique_id", "1:" + instance_id + ":compute", true));
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,8 @@ DECLARE_mInt32(sync_load_for_tablets_thread);
// enable large txn lazy commit in meta-service `commit_txn`
DECLARE_mBool(enable_cloud_txn_lazy_commit);

DECLARE_mInt32(remove_expired_tablet_txn_info_interval_seconds);

DECLARE_mInt32(tablet_txn_info_min_expired_seconds);

} // namespace doris::config
Loading