Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](memory) Avoid modify memory tracker when Allocator alloc memory #40110

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions be/src/olap/page_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,17 @@
namespace doris {
template <typename TAllocator>
PageBase<TAllocator>::PageBase(size_t b, bool use_cache, segment_v2::PageTypePB page_type)
: LRUCacheValueBase(), _size(b), _capacity(b) {
if (use_cache) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
StoragePageCache::instance()->mem_tracker(page_type));
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
} else {
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
}
: TAllocator(use_cache ? StoragePageCache::instance()->mem_tracker(page_type) : nullptr),
LRUCacheValueBase(),
_size(b),
_capacity(b) {
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
}

template <typename TAllocator>
PageBase<TAllocator>::~PageBase() {
if (_data != nullptr) {
DCHECK(_capacity != 0 && _size != 0);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(TAllocator::mem_tracker_);
TAllocator::free(_data, _capacity);
}
}
Expand Down
21 changes: 14 additions & 7 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,27 +452,34 @@ class SwitchThreadMemTrackerLimiter {
const std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
doris::ThreadLocalHandle::create_thread_local_if_not_exits();
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
if (mem_tracker != thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) {
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
}
}

explicit SwitchThreadMemTrackerLimiter(const doris::QueryThreadContext& query_thread_context) {
doris::ThreadLocalHandle::create_thread_local_if_not_exits();
DCHECK(thread_context()->task_id() ==
query_thread_context.query_id); // workload group alse not change
DCHECK(query_thread_context.query_mem_tracker);
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
query_thread_context.query_mem_tracker);
if (query_thread_context.query_mem_tracker !=
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) {
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
query_thread_context.query_mem_tracker);
}
}

~SwitchThreadMemTrackerLimiter() {
thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
if (_old_mem_tracker != nullptr) {
thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
}
doris::ThreadLocalHandle::del_thread_local_if_count_is_zero();
}

private:
std::shared_ptr<doris::MemTrackerLimiter> _old_mem_tracker;
std::shared_ptr<doris::MemTrackerLimiter> _old_mem_tracker {nullptr};
};

class AddThreadMemTrackerConsumer {
Expand Down
11 changes: 6 additions & 5 deletions be/src/util/byte_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ struct ByteBuffer : private Allocator<false> {
return Status::OK();
}

~ByteBuffer() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_);
Allocator<false>::free(ptr, capacity);
}
~ByteBuffer() { Allocator<false>::free(ptr, capacity); }
xinyiZzz marked this conversation as resolved.
Show resolved Hide resolved

void put_bytes(const char* data, size_t size) {
memcpy(ptr + pos, data, size);
Expand All @@ -69,7 +66,11 @@ struct ByteBuffer : private Allocator<false> {
size_t capacity;

private:
ByteBuffer(size_t capacity_) : pos(0), limit(capacity_), capacity(capacity_) {
ByteBuffer(size_t capacity_)
: Allocator(doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()),
pos(0),
limit(capacity_),
capacity(capacity_) {
ptr = reinterpret_cast<char*>(Allocator<false>::alloc(capacity_));
}
};
Expand Down
43 changes: 16 additions & 27 deletions be/src/vec/common/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@
std::unordered_map<void*, size_t> RecordSizeMemoryAllocator::_allocated_sizes;
std::mutex RecordSizeMemoryAllocator::_mutex;

template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::Allocator(
const std::shared_ptr<doris::MemTrackerLimiter>& tracker) {
if (tracker) {
mem_tracker_ = tracker;
}
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_memory_check(
size_t size) const {
Expand Down Expand Up @@ -211,41 +219,22 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::memory_

template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::consume_memory(
size_t size) {
// Usually, an object that inherits Allocator has the same TLS tracker for each alloc.
// If an object that inherits Allocator needs to be reused by multiple queries,
// it is necessary to switch the same tracker to TLS when calling alloc.
// However, in ORC Reader, ORC DataBuffer will be reused, but we cannot switch TLS tracker,
// so we update the Allocator tracker when the TLS tracker changes.
// note that the tracker in thread context when object that inherit Allocator is constructed may be
// no attach memory tracker in tls. usually the memory tracker is attached in tls only during the first alloc.
if (mem_tracker_ == nullptr ||
mem_tracker_->label() != doris::thread_context()->thread_mem_tracker()->label()) {
mem_tracker_ = doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
size_t size) const {
if (mem_tracker_) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_);
CONSUME_THREAD_MEM_TRACKER(size);
} else {
CONSUME_THREAD_MEM_TRACKER(size);
}
CONSUME_THREAD_MEM_TRACKER(size);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::release_memory(
size_t size) const {
doris::ThreadContext* thread_context = doris::thread_context(true);
if ((thread_context && thread_context->thread_mem_tracker()->label() != "Orphan") ||
mem_tracker_ == nullptr) {
// If thread_context exist and the label of thread_mem_tracker not equal to `Orphan`,
// this means that in the scope of SCOPED_ATTACH_TASK,
// so thread_mem_tracker should be used to release memory.
// If mem_tracker_ is nullptr there is a scenario where an object that inherits Allocator
// has never called alloc, but free memory.
// in phmap, the memory alloced by an object may be transferred to another object and then free.
// in this case, thread context must attach a memory tracker other than Orphan,
// otherwise memory tracking will be wrong.
if (mem_tracker_) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_);
RELEASE_THREAD_MEM_TRACKER(size);
} else {
// if thread_context does not exist or the label of thread_mem_tracker is equal to
// `Orphan`, it usually happens during object destruction. This means that
// the scope of SCOPED_ATTACH_TASK has been left, so release memory using Allocator tracker.
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_);
RELEASE_THREAD_MEM_TRACKER(size);
}
}
Expand Down
11 changes: 10 additions & 1 deletion be/src/vec/common/allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,22 @@ class RecordSizeMemoryAllocator {
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
class Allocator {
public:
Allocator(const std::shared_ptr<doris::MemTrackerLimiter>& tracker = nullptr);

void sys_memory_check(size_t size) const;
void memory_tracker_check(size_t size) const;
// If sys memory or tracker exceeds the limit, but there is no external catch bad_alloc,
// alloc will continue to execute, so the consume memtracker is forced.
void memory_check(size_t size) const;
// Increases consumption of this tracker by 'bytes'.
void consume_memory(size_t size);
// some special cases:
// 1. objects that inherit Allocator will not be shared by multiple queries.
// non-compliant: page cache, ORC ByteBuffer.
// 2. objects that inherit Allocator will only free memory allocated by themselves.
// non-compliant: phmap, the memory alloced by an object may be transferred to another object and then free.
// 3. the memory tracker in TLS is the same during the construction of objects that inherit Allocator
// and during subsequent memory allocation.
void consume_memory(size_t size) const;
void release_memory(size_t size) const;
void throw_bad_alloc(const std::string& err) const;
#ifndef NDEBUG
Expand Down
Loading