Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement](cloud) clarify codes and make TTL expiration work after abnormal cache type transition #40226

Merged
merged 10 commits into from
Sep 12, 2024
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,8 @@ DEFINE_mInt64(file_cache_ttl_valid_check_interval_second, "0"); // zero for not
// If true, evict the ttl cache using LRU when full.
// Otherwise, only expiration can evict ttl and new data won't add to cache when full.
DEFINE_Bool(enable_ttl_cache_evict_using_lru, "true");
// rename ttl filename to new format during read, with some performance cost
DEFINE_mBool(translate_to_new_ttl_format_during_read, "false");

DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,8 @@ DECLARE_mInt64(file_cache_ttl_valid_check_interval_second);
// If true, evict the ttl cache using LRU when full.
// Otherwise, only expiration can evict ttl and new data won't add to cache when full.
DECLARE_Bool(enable_ttl_cache_evict_using_lru);
// rename ttl filename to new format during read, with some performance cost
DECLARE_Bool(translate_to_new_ttl_format_during_read);

// inverted index searcher cache
// cache entry stay time after lookup
Expand Down
49 changes: 32 additions & 17 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte
/// find list [block1, ..., blockN] of blocks which intersect with given range.
auto it = _files.find(hash);
if (it == _files.end()) {
if (_lazy_open_done) {
if (_async_open_done) {
return {};
}
FileCacheKey key;
Expand All @@ -285,11 +285,10 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte
if (!st.ok()) {
LOG_WARNING("Failed to change key meta").error(st);
}
}
for (auto& [_, cell] : file_blocks) {

FileCacheType origin_type = cell.file_block->cache_type();
if (origin_type == FileCacheType::TTL) continue;
Status st = cell.file_block->change_cache_type_by_mgr(FileCacheType::TTL);
st = cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::TTL);
if (st.ok()) {
auto& queue = get_queue(origin_type);
queue.remove(cell.queue_iterator.value(), cache_lock);
Expand All @@ -309,6 +308,7 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte
_time_to_key.insert(std::make_pair(context.expiration_time, hash));
}
if (auto iter = _key_to_time.find(hash);
// TODO(zhengyu): Why the hell the type is NORMAL while context set expiration_time?
(context.cache_type == FileCacheType::NORMAL || context.cache_type == FileCacheType::TTL) &&
iter != _key_to_time.end() && iter->second != context.expiration_time) {
// remove from _time_to_key
Expand All @@ -330,7 +330,8 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte
for (auto& [_, cell] : file_blocks) {
auto cache_type = cell.file_block->cache_type();
if (cache_type != FileCacheType::TTL) continue;
auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL);
auto st = cell.file_block->change_cache_type_between_ttl_and_others(
FileCacheType::NORMAL);
if (st.ok()) {
if (config::enable_ttl_cache_evict_using_lru) {
auto& ttl_queue = get_queue(FileCacheType::TTL);
Expand Down Expand Up @@ -699,9 +700,9 @@ BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& ha
FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
Status st;
if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL);
st = cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::NORMAL);
} else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
st = cell.file_block->change_cache_type_by_mgr(FileCacheType::TTL);
st = cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::TTL);
}
if (!st.ok()) {
LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
Expand Down Expand Up @@ -912,8 +913,8 @@ bool BlockFileCache::try_reserve_for_ttl(size_t size, std::lock_guard<std::mutex
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
size_t offset, size_t size,
std::lock_guard<std::mutex>& cache_lock) {
if (!_lazy_open_done) {
return try_reserve_for_lazy_load(size, cache_lock);
if (!_async_open_done) {
return try_reserve_during_async_load(size, cache_lock);
}

// use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
Expand Down Expand Up @@ -1022,10 +1023,10 @@ bool BlockFileCache::remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, b
LOG_WARNING("Failed to update expiration time to 0").error(st);
}
}
}
for (auto& [_, cell] : _files[file_key]) {

if (cell.file_block->cache_type() == FileCacheType::NORMAL) continue;
auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL);
auto st = cell.file_block->change_cache_type_between_ttl_and_others(
FileCacheType::NORMAL);
if (st.ok()) {
if (config::enable_ttl_cache_evict_using_lru) {
ttl_queue.remove(cell.queue_iterator.value(), cache_lock);
Expand Down Expand Up @@ -1396,6 +1397,21 @@ std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
return result.str();
}

std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
std::lock_guard cache_lock(_mutex);
return dump_single_cache_type_unlocked(hash, offset, cache_lock);
}

std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
size_t offset,
std::lock_guard<std::mutex>&) {
std::stringstream result;
const auto& cells_by_offset = _files[hash];
const auto& cell = cells_by_offset.find(offset);

return cache_type_to_string(cell->second.file_block->cache_type());
}

void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
FileCacheType new_type,
std::lock_guard<std::mutex>& cache_lock) {
Expand Down Expand Up @@ -1621,11 +1637,10 @@ void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash,
if (!st.ok()) {
LOG_WARNING("").error(st);
}
}
for (auto& [_, cell] : iter->second) {

FileCacheType origin_type = cell.file_block->cache_type();
if (origin_type == FileCacheType::TTL) continue;
auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::TTL);
st = cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::TTL);
if (st.ok()) {
auto& queue = get_queue(origin_type);
queue.remove(cell.queue_iterator.value(), cache_lock);
Expand Down Expand Up @@ -1672,8 +1687,8 @@ BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
return blocks_meta;
}

bool BlockFileCache::try_reserve_for_lazy_load(size_t size,
std::lock_guard<std::mutex>& cache_lock) {
bool BlockFileCache::try_reserve_during_async_load(size_t size,
std::lock_guard<std::mutex>& cache_lock) {
size_t removed_size = 0;
size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
Expand Down
12 changes: 8 additions & 4 deletions be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ class BlockFileCache {
std::string reset_capacity(size_t new_capacity);

std::map<size_t, FileBlockSPtr> get_blocks_by_key(const UInt128Wrapper& hash);
/// For debug.
/// For debug and UT
std::string dump_structure(const UInt128Wrapper& hash);
std::string dump_single_cache_type(const UInt128Wrapper& hash, size_t offset);

[[nodiscard]] size_t get_used_cache_size(FileCacheType type) const;

Expand All @@ -130,7 +131,7 @@ class BlockFileCache {
[[nodiscard]] std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
get_hot_blocks_meta(const UInt128Wrapper& hash) const;

[[nodiscard]] bool get_lazy_open_success() const { return _lazy_open_done; }
[[nodiscard]] bool get_async_open_success() const { return _async_open_done; }

BlockFileCache& operator=(const BlockFileCache&) = delete;
BlockFileCache(const BlockFileCache&) = delete;
Expand Down Expand Up @@ -338,7 +339,7 @@ class BlockFileCache {
const CacheContext& context, size_t offset, size_t size,
std::lock_guard<std::mutex>& cache_lock);

bool try_reserve_for_lazy_load(size_t size, std::lock_guard<std::mutex>& cache_lock);
bool try_reserve_during_async_load(size_t size, std::lock_guard<std::mutex>& cache_lock);

std::vector<FileCacheType> get_other_cache_type(FileCacheType cur_cache_type);

Expand All @@ -358,6 +359,9 @@ class BlockFileCache {
std::string dump_structure_unlocked(const UInt128Wrapper& hash,
std::lock_guard<std::mutex>& cache_lock);

std::string dump_single_cache_type_unlocked(const UInt128Wrapper& hash, size_t offset,
std::lock_guard<std::mutex>& cache_lock);

void fill_holes_with_empty_file_blocks(FileBlocks& file_blocks, const UInt128Wrapper& hash,
const CacheContext& context,
const FileBlock::Range& range,
Expand Down Expand Up @@ -413,7 +417,7 @@ class BlockFileCache {
std::mutex _close_mtx;
std::condition_variable _close_cv;
std::thread _cache_background_thread;
std::atomic_bool _lazy_open_done {false};
std::atomic_bool _async_open_done {false};
bool _async_clear_file_cache {false};
// disk space or inode is less than the specified value
bool _disk_resource_limit_mode {false};
Expand Down
40 changes: 23 additions & 17 deletions be/src/io/cache/file_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,32 +161,41 @@ Status FileBlock::read(Slice buffer, size_t read_offset) {
return _mgr->_storage->read(_key, read_offset, buffer);
}

Status FileBlock::change_cache_type_by_mgr(FileCacheType new_type) {
Status FileBlock::change_cache_type_between_ttl_and_others(FileCacheType new_type) {
std::lock_guard block_lock(_mutex);
DCHECK(new_type != _key.meta.type);
if (_download_state == State::DOWNLOADED) {
KeyMeta new_meta;
new_meta.expiration_time = _key.meta.expiration_time;
new_meta.type = new_type;
auto st = _mgr->_storage->change_key_meta(_key, new_meta);
TEST_SYNC_POINT_CALLBACK("FileBlock::change_cache_type", &st);
if (!st.ok()) return st;
bool expr = (new_type == FileCacheType::TTL || _key.meta.type == FileCacheType::TTL);
if (!expr) {
LOG(WARNING) << "none of the cache type is TTL"
<< ", hash: " << _key.hash.to_string() << ", offset: " << _key.offset
<< ", new type: " << BlockFileCache::cache_type_to_string(new_type)
<< ", old type: " << BlockFileCache::cache_type_to_string(_key.meta.type);
}
DCHECK(expr);

// change cache type between TTL to others don't need to rename the filename suffix
_key.meta.type = new_type;
return Status::OK();
}

Status FileBlock::change_cache_type_self(FileCacheType new_type) {
Status FileBlock::change_cache_type_between_normal_and_index(FileCacheType new_type) {
std::lock_guard cache_lock(_mgr->_mutex);
std::lock_guard block_lock(_mutex);
bool expr = (new_type != FileCacheType::TTL && _key.meta.type != FileCacheType::TTL);
if (!expr) {
LOG(WARNING) << "one of the cache type is TTL"
<< ", hash: " << _key.hash.to_string() << ", offset: " << _key.offset
<< ", new type: " << BlockFileCache::cache_type_to_string(new_type)
<< ", old type: " << BlockFileCache::cache_type_to_string(_key.meta.type);
}
DCHECK(expr);
if (_key.meta.type == FileCacheType::TTL || new_type == _key.meta.type) {
return Status::OK();
freemandealer marked this conversation as resolved.
Show resolved Hide resolved
}
if (_download_state == State::DOWNLOADED) {
KeyMeta new_meta;
new_meta.expiration_time = _key.meta.expiration_time;
new_meta.type = new_type;
RETURN_IF_ERROR(_mgr->_storage->change_key_meta(_key, new_meta));
Status st;
TEST_SYNC_POINT_CALLBACK("FileBlock::change_cache_type", &st);
RETURN_IF_ERROR(_mgr->_storage->change_key_meta_type(_key, new_type));
}
_mgr->change_cache_type(_key.hash, _block_range.left, new_type, cache_lock);
_key.meta.type = new_type;
Expand All @@ -196,10 +205,7 @@ Status FileBlock::change_cache_type_self(FileCacheType new_type) {
Status FileBlock::update_expiration_time(uint64_t expiration_time) {
std::lock_guard block_lock(_mutex);
if (_download_state == State::DOWNLOADED) {
KeyMeta new_meta;
new_meta.expiration_time = expiration_time;
new_meta.type = _key.meta.type;
auto st = _mgr->_storage->change_key_meta(_key, new_meta);
auto st = _mgr->_storage->change_key_meta_expiration(_key, expiration_time);
if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) {
return st;
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/cache/file_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ class FileBlock {

std::string get_info_for_log() const;

[[nodiscard]] Status change_cache_type_by_mgr(FileCacheType new_type);
[[nodiscard]] Status change_cache_type_between_ttl_and_others(FileCacheType new_type);

[[nodiscard]] Status change_cache_type_self(FileCacheType new_type);
[[nodiscard]] Status change_cache_type_between_normal_and_index(FileCacheType new_type);

[[nodiscard]] Status update_expiration_time(uint64_t expiration_time);

Expand Down
4 changes: 3 additions & 1 deletion be/src/io/cache/file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ class FileCacheStorage {
// remove the block
virtual Status remove(const FileCacheKey& key) = 0;
// change the block meta
virtual Status change_key_meta(const FileCacheKey& key, const KeyMeta& new_meta) = 0;
virtual Status change_key_meta_type(const FileCacheKey& key, const FileCacheType type) = 0;
virtual Status change_key_meta_expiration(const FileCacheKey& key,
const uint64_t expiration) = 0;
// use when lazy load cache
virtual void load_blocks_directly_unlocked(BlockFileCache* _mgr, const FileCacheKey& key,
std::lock_guard<std::mutex>& cache_lock) {}
Expand Down
Loading
Loading