diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 7433b781c65d99..8d132020a77708 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -658,8 +658,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx RowsetSharedPtr rowset = txn_info->rowset; int64_t cur_version = rowset->start_version(); // update delete bitmap info, in order to avoid recalculation when trying again - _engine.txn_delete_bitmap_cache().update_tablet_txn_info( - txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE); + RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info( + txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE)); if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update && rowset_writer->num_rows() > 0) { @@ -684,9 +684,9 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason, // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do // delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail - _engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id, tablet_id(), delete_bitmap, - cur_rowset_ids, PublishStatus::SUCCEED, - txn_info->publish_info); + RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info( + txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED, + txn_info->publish_info)); return Status::OK(); } diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp index c6a3b54edc3f67..4ea2699bdd9174 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp @@ -23,6 +23,7 @@ #include #include +#include "cloud/config.h" #include "common/status.h" #include "cpp/sync_point.h" #include "olap/olap_common.h" @@ -119,12 +120,11 @@ void CloudTxnDeleteBitmapCache::set_tablet_txn_info( TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, RowsetSharedPtr rowset, int64_t txn_expiration, std::shared_ptr partial_update_info) { - if (txn_expiration <= 0) { - txn_expiration = duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count() + - 120; - } + int64_t txn_expiration_min = + duration_cast(std::chrono::system_clock::now().time_since_epoch()) + .count() + + config::tablet_txn_info_min_expired_seconds; + txn_expiration = std::max(txn_expiration_min, txn_expiration); { std::unique_lock wlock(_rwlock); TxnKey txn_key(transaction_id, tablet_id); @@ -153,16 +153,21 @@ void CloudTxnDeleteBitmapCache::set_tablet_txn_info( .tag("delete_bitmap_size", charge); } -void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transaction_id, - int64_t tablet_id, - DeleteBitmapPtr delete_bitmap, - const RowsetIdUnorderedSet& rowset_ids, - PublishStatus publish_status, - TxnPublishInfo publish_info) { +Status CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transaction_id, + int64_t tablet_id, + DeleteBitmapPtr delete_bitmap, + const RowsetIdUnorderedSet& rowset_ids, + PublishStatus publish_status, + TxnPublishInfo publish_info) { { std::unique_lock wlock(_rwlock); TxnKey txn_key(transaction_id, tablet_id); - CHECK(_txn_map.contains(txn_key)); + if (!_txn_map.contains(txn_key)) { + return Status::Error( + "not found txn info, tablet_id={}, transaction_id={}, may be expired and be " + "removed", + tablet_id, transaction_id); + } TxnVal& txn_val = _txn_map[txn_key]; *(txn_val.publish_status) = publish_status; if (publish_status == PublishStatus::SUCCEED) { @@ -184,7 +189,9 @@ void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transactio LOG_INFO("update txn related delete bitmap") .tag("txn_id", transaction_id) .tag("tablt_id", tablet_id) - .tag("delete_bitmap_size", charge); + .tag("delete_bitmap_size", charge) + .tag("publish_status", static_cast(publish_status)); + return Status::OK(); } void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() { @@ -238,7 +245,8 @@ void CloudTxnDeleteBitmapCache::remove_unused_tablet_txn_info(TTransactionId tra void CloudTxnDeleteBitmapCache::_clean_thread_callback() { do { remove_expired_tablet_txn_info(); - } while (!_stop_latch.wait_for(std::chrono::seconds(300))); + } while (!_stop_latch.wait_for( + std::chrono::seconds(config::remove_expired_tablet_txn_info_interval_seconds))); } } // namespace doris \ No newline at end of file diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h b/be/src/cloud/cloud_txn_delete_bitmap_cache.h index 75577ae2e3fee0..db5f8867263168 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.h +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h @@ -50,10 +50,10 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { RowsetSharedPtr rowset, int64_t txn_expirationm, std::shared_ptr partial_update_info); - void update_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id, - DeleteBitmapPtr delete_bitmap, - const RowsetIdUnorderedSet& rowset_ids, - PublishStatus publish_status, TxnPublishInfo publish_info = {}); + Status update_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id, + DeleteBitmapPtr delete_bitmap, + const RowsetIdUnorderedSet& rowset_ids, + PublishStatus publish_status, TxnPublishInfo publish_info = {}); void remove_expired_tablet_txn_info(); diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index 0f59b51059b69e..44f9fa42cae0ba 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -68,6 +68,10 @@ DEFINE_mBool(enable_new_tablet_do_compaction, "false"); DEFINE_Bool(enable_cloud_txn_lazy_commit, "false"); +DEFINE_mInt32(remove_expired_tablet_txn_info_interval_seconds, "300"); + +DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120"); + void set_cloud_unique_id(std::string instance_id) { if (cloud_unique_id.empty() && !instance_id.empty()) { static_cast(set_config("cloud_unique_id", "1:" + instance_id + ":compute", true)); diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index 57f6348df7067b..ba20bccbcc7876 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -103,4 +103,8 @@ DECLARE_mInt32(sync_load_for_tablets_thread); // enable large txn lazy commit in meta-service `commit_txn` DECLARE_mBool(enable_cloud_txn_lazy_commit); +DECLARE_mInt32(remove_expired_tablet_txn_info_interval_seconds); + +DECLARE_mInt32(tablet_txn_info_min_expired_seconds); + } // namespace doris::config