Skip to content

Commit

Permalink
Revert "[enhancement](mem-tracker) Use thread local mem tracker to tr…
Browse files Browse the repository at this point in the history
…ack s3 file buffer memory usage" (#41360)

Reverts #40597

This PR cased memtable memtracker check in cloud_p0 to fail.

```
*** Check failure stack trace: ***
[17:31:12 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_CloudP0/536070?buildTab=log&linesState=43518&logView=flowAware&focusLine=43518)  F20240926 17:27:30.342573 15767 memtable.cpp:145] memtable flush success but cosumption is not 0, it is 5242880F20240926 17:27:30.357038 15763 memtable.cpp:145] memtable flush success but cosumption is not 0, it is 5242880F20240926 17:27:30.357043 15762
[17:31:12 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_CloudP0/536070?buildTab=log&linesState=43519&logView=flowAware&focusLine=43519)   mtable.cpp:145] memtable flush success but cosumption is not 0, it is 5242880
[17:31:12 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_CloudP0/536070?buildTab=log&linesState=43520&logView=flowAware&focusLine=43520)  *** Check failure stack trace: ***
[17:31:12 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_CloudP0/536070?buildTab=log&linesState=43521&logView=flowAware&focusLine=43521)  F20240926 17:27:30.342573 15767 memtable.cpp:145] memtable flush success but cosumption is not 0, it is 5242880F20240926 17:27:30.357038 15763 memtable.cpp:145] memtable flush success but cosumption is not 0, it is 5242880F20240926 17:27:30.357043 15762
[17:31:12 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_CloudP0/536070?buildTab=log&linesState=43522&logView=flowAware&focusLine=43522)   mtable.cpp:145] memtable flush success but cosumption is not 0, it is 5242880F20240926 17:27:30.358129 15765 memtable.cpp:145] memtable flush success but cosumption is not 0, it is 5242880
[17:31:12 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_CloudP0/536070?buildTab=log&linesState=43523&logView=flowAware&focusLine=43523)  *** Check failure stack trace: ***
[17:31:12 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_CloudP0/536070?buildTab=log&linesState=43524&logView=flowAware&focusLine=43524)  F20240926 17:27:30.342573 15767 memtable.cpp:145] memtable flush success but cosumption is not 0, it is 5242880F20240926 17:27:30.357038 15763 memtable.cpp:145] memtable flush success but cosumption is not 0, it is 5242880F20240926 17:27:30.357043 15762
[17:31:12 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_CloudP0/536070?buildTab=log&linesState=43525&logView=flowAware&focusLine=43525)   mtable.cpp:145] memtable flush success but cosumption is not 0, it is 5242880F20240926 17:27:30.358129 15765 memtable.cpp:145] memtable flush success but cosumption is not 0, it is 5242880F20240926 17:27:30.358175 15766 memtable.cpp:145] memtable flush success but cosumption is not 0, it is 5242880
[17:31:12 ](http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_CloudP0/536070?buildTab=log&linesState=43526&logView=flowAware&focusLine=43526)  *** Check failure stack trace: ***
```


http://43.132.222.7:8111/buildConfiguration/Doris_DorisRegression_CloudP0/536070?buildTab=log&linesState=25290&logView=flowAware&focusLine=44023
  • Loading branch information
kaijchen authored and dataroaring committed Oct 5, 2024
1 parent 3107130 commit b0e20ac
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 44 deletions.
25 changes: 6 additions & 19 deletions be/src/io/fs/s3_file_bufferpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include "io/cache/file_cache_common.h"
#include "io/fs/s3_common.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "util/slice.h"
Expand Down Expand Up @@ -78,19 +77,17 @@ Slice FileBuffer::get_slice() const {
}

FileBuffer::FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> alloc_holder,
size_t offset, OperationState state,
std::shared_ptr<MemTrackerLimiter> mem_tracker)
size_t offset, OperationState state)
: _type(type),
_alloc_holder(std::move(alloc_holder)),
_offset(offset),
_size(0),
_state(std::move(state)),
_inner_data(std::make_unique<FileBuffer::PartData>()),
_capacity(_inner_data->size()),
_mem_tracker(std::move(mem_tracker)) {}
_capacity(_inner_data->size()) {}

FileBuffer::~FileBuffer() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
_inner_data.reset();
}

Expand Down Expand Up @@ -243,31 +240,21 @@ FileBufferBuilder& FileBufferBuilder::set_allocate_file_blocks_holder(
}

Status FileBufferBuilder::build(std::shared_ptr<FileBuffer>* buf) {
auto mem_tracker = ExecEnv::GetInstance()->s3_file_buffer_tracker();
auto* thread_ctx = doris::thread_context(true);
if (thread_ctx != nullptr) {
// if thread local mem tracker is set, use it instead.
auto curr_tracker = thread_ctx->thread_mem_tracker_mgr->limiter_mem_tracker();
if (curr_tracker != ExecEnv::GetInstance()->orphan_mem_tracker()) {
mem_tracker = std::move(curr_tracker);
}
}
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
OperationState state(_sync_after_complete_task, _is_cancelled);

if (_type == BufferType::UPLOAD) {
RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<UploadFileBuffer>(
std::move(_upload_cb), std::move(state), _offset,
std::move(_alloc_holder_cb), std::move(mem_tracker)));
std::move(_alloc_holder_cb)));
return Status::OK();
}
if (_type == BufferType::DOWNLOAD) {
RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<DownloadFileBuffer>(
std::move(_download),
std::move(_write_to_local_file_cache),
std::move(_write_to_use_buffer), std::move(state),
_offset, std::move(_alloc_holder_cb),
std::move(mem_tracker)));
_offset, std::move(_alloc_holder_cb)));
return Status::OK();
}
// should never come here
Expand Down
14 changes: 5 additions & 9 deletions be/src/io/fs/s3_file_bufferpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

#include "common/status.h"
#include "io/cache/file_block.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "util/crc32c.h"
#include "util/slice.h"
#include "util/threadpool.h"
Expand Down Expand Up @@ -78,7 +77,7 @@ struct OperationState {

struct FileBuffer {
FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> alloc_holder, size_t offset,
OperationState state, std::shared_ptr<MemTrackerLimiter> mem_tracker);
OperationState state);
virtual ~FileBuffer();
/**
* submit the correspoding task to async executor
Expand Down Expand Up @@ -128,16 +127,14 @@ struct FileBuffer {
struct PartData;
std::unique_ptr<PartData> _inner_data;
size_t _capacity;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};

struct DownloadFileBuffer final : public FileBuffer {
DownloadFileBuffer(std::function<Status(Slice&)> download,
std::function<void(FileBlocksHolderPtr, Slice)> write_to_cache,
std::function<void(Slice, size_t)> write_to_use_buffer, OperationState state,
size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder,
std::shared_ptr<MemTrackerLimiter> mem_tracker)
: FileBuffer(BufferType::DOWNLOAD, alloc_holder, offset, state, std::move(mem_tracker)),
size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder)
: FileBuffer(BufferType::DOWNLOAD, alloc_holder, offset, state),
_download(std::move(download)),
_write_to_local_file_cache(std::move(write_to_cache)),
_write_to_use_buffer(std::move(write_to_use_buffer)) {}
Expand All @@ -156,9 +153,8 @@ struct DownloadFileBuffer final : public FileBuffer {

struct UploadFileBuffer final : public FileBuffer {
UploadFileBuffer(std::function<void(UploadFileBuffer&)> upload_cb, OperationState state,
size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder,
std::shared_ptr<MemTrackerLimiter> mem_tracker)
: FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state, std::move(mem_tracker)),
size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder)
: FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state),
_upload_to_remote(std::move(upload_cb)) {}
~UploadFileBuffer() override = default;
Status append_data(const Slice& s) override;
Expand Down
6 changes: 0 additions & 6 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@
#include "olap/txn_manager.h"
#include "olap/types.h"
#include "olap/utils.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "segment_loader.h"
#include "service/point_query_executor.h"
#include "tablet.h"
Expand Down Expand Up @@ -269,9 +268,6 @@ Tablet::Tablet(StorageEngine& engine, TabletMetaSharedPtr tablet_meta, DataDir*
_tablet_path = fmt::format("{}/{}/{}/{}/{}", _data_dir->path(), DATA_PREFIX,
_tablet_meta->shard_id(), tablet_id(), schema_hash());
}
_upload_cooldown_meta_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER,
fmt::format("UploadCoolDownMeta#tablet_id={}", tablet_id()));
}

bool Tablet::set_tablet_schema_into_rowset_meta() {
Expand Down Expand Up @@ -2101,8 +2097,6 @@ Status Tablet::write_cooldown_meta() {
_cooldown_conf.cooldown_replica_id, tablet_id());
}

SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_upload_cooldown_meta_tracker);

auto storage_resource = DORIS_TRY(get_resource_by_storage_policy_id(storage_policy_id()));

std::vector<RowsetMetaSharedPtr> cooldowned_rs_metas;
Expand Down
3 changes: 0 additions & 3 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
#include "olap/rowset/rowset_reader.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/version_graph.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "segment_loader.h"
#include "util/metrics.h"
#include "util/once.h"
Expand Down Expand Up @@ -609,8 +608,6 @@ class Tablet final : public BaseTablet {
std::shared_ptr<const VersionWithTime> _visible_version;

std::atomic_bool _is_full_compaction_running = false;

std::shared_ptr<MemTrackerLimiter> _upload_cooldown_meta_tracker;
};

inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() {
Expand Down
5 changes: 0 additions & 5 deletions be/src/runtime/snapshot_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
#include "olap/tablet_manager.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "util/s3_uri.h"
#include "util/s3_util.h"
#include "util/thrift_rpc_helper.h"
Expand Down Expand Up @@ -116,9 +115,6 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l
} else {
return Status::InternalError("Unknown storage type: {}", type);
}
_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER,
fmt::format("SnapShotLoader#job_id={}#task_id={}", _job_id, _task_id));
return Status::OK();
}

Expand All @@ -129,7 +125,6 @@ Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_d
if (!_remote_fs) {
return Status::InternalError("Storage backend not initialized.");
}
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
LOG(INFO) << "begin to upload snapshot files. num: " << src_to_dest_path.size()
<< ", broker addr: " << _broker_addr << ", job: " << _job_id << ", task" << _task_id;

Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/snapshot_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

#include "common/status.h"
#include "olap/tablet_fwd.h"
#include "runtime/memory/mem_tracker_limiter.h"

namespace doris {
namespace io {
Expand Down Expand Up @@ -112,7 +111,6 @@ class SnapshotLoader {
const TNetworkAddress _broker_addr;
const std::map<std::string, std::string> _prop;
std::shared_ptr<io::RemoteFileSystem> _remote_fs;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};

} // end namespace doris

0 comments on commit b0e20ac

Please sign in to comment.