Skip to content

Commit

Permalink
[branch-2.0](cherry-pick) Fix task repeat attach task DCHECK failed (#…
Browse files Browse the repository at this point in the history
…32914)

* 1

* 2
  • Loading branch information
xinyiZzz authored Mar 27, 2024
1 parent d933046 commit 2dca398
Show file tree
Hide file tree
Showing 11 changed files with 11 additions and 13 deletions.
4 changes: 4 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ void TaskWorkerPool::_alter_inverted_index_worker_thread_callback() {
alter_inverted_index_rq.tablet_id);
if (tablet_ptr != nullptr) {
EngineIndexChangeTask engine_task(alter_inverted_index_rq);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
status = _env->storage_engine()->execute_task(&engine_task);
} else {
status =
Expand Down Expand Up @@ -587,6 +588,7 @@ void TaskWorkerPool::_check_consistency_worker_thread_callback() {
EngineChecksumTask engine_task(check_consistency_req.tablet_id,
check_consistency_req.schema_hash,
check_consistency_req.version, &checksum);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
Status status = _env->storage_engine()->execute_task(&engine_task);
if (!status.ok()) {
LOG_WARNING("failed to check consistency")
Expand Down Expand Up @@ -1487,6 +1489,7 @@ void PushTaskPool::_push_worker_thread_callback() {
std::vector<TTabletInfo> tablet_infos;

EngineBatchLoadTask engine_task(push_req, &tablet_infos);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
auto status = _env->storage_engine()->execute_task(&engine_task);

// Return result to fe
Expand Down Expand Up @@ -1893,6 +1896,7 @@ void CloneTaskPool::_clone_worker_thread_callback() {
std::vector<TTabletInfo> tablet_infos;
EngineCloneTask engine_task(clone_req, _master_info, agent_task_req.signature,
&tablet_infos);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
auto status = _env->storage_engine()->execute_task(&engine_task);
// Return result to fe
TFinishTaskRequest finish_task_request;
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/task/engine_batch_load_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vector<TTablet
EngineBatchLoadTask::~EngineBatchLoadTask() {}

Status EngineBatchLoadTask::execute() {
SCOPED_ATTACH_TASK(_mem_tracker);
Status status;
if (_push_req.push_type == TPushType::LOAD_V2) {
RETURN_IF_ERROR(_init());
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/task/engine_batch_load_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "olap/task/engine_task.h"

namespace doris {
class MemTrackerLimiter;
class TPushReq;
class TTabletInfo;

Expand Down Expand Up @@ -71,7 +70,6 @@ class EngineBatchLoadTask : public EngineTask {
std::vector<TTabletInfo>* _tablet_infos;
std::string _remote_file_path;
std::string _local_file_path;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // class EngineBatchLoadTask
} // namespace doris
#endif // DORIS_BE_SRC_OLAP_TASK_ENGINE_BATCH_LOAD_TASK_H
1 change: 0 additions & 1 deletion be/src/olap/task/engine_checksum_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_h
}

Status EngineChecksumTask::execute() {
SCOPED_ATTACH_TASK(_mem_tracker);
return _compute_checksum();
} // execute

Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/task/engine_checksum_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include "olap/task/engine_task.h"

namespace doris {
class MemTrackerLimiter;

// base class for storage engine
// add "Engine" as task prefix to prevent duplicate name with agent task
Expand All @@ -49,7 +48,6 @@ class EngineChecksumTask : public EngineTask {
TSchemaHash _schema_hash;
TVersion _version;
uint32_t* _checksum;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // EngineTask

} // namespace doris
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo&

Status EngineCloneTask::execute() {
// register the tablet to avoid it is deleted by gc thread during clone process
SCOPED_ATTACH_TASK(_mem_tracker);
if (!StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id)) {
return Status::InternalError("tablet {} is under clone", _clone_req.tablet_id);
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/task/engine_clone_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

namespace doris {
class DataDir;
class MemTrackerLimiter;
class TCloneReq;
class TMasterInfo;
class TTabletInfo;
Expand Down Expand Up @@ -95,7 +94,6 @@ class EngineCloneTask : public EngineTask {
const TMasterInfo& _master_info;
int64_t _copy_size;
int64_t _copy_time_ms;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // EngineTask

} // namespace doris
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/task/engine_index_change_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#include "olap/task/engine_index_change_task.h"

#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/thread_context.h"

namespace doris {
Expand All @@ -33,7 +33,6 @@ EngineIndexChangeTask::EngineIndexChangeTask(
}

Status EngineIndexChangeTask::execute() {
SCOPED_ATTACH_TASK(_mem_tracker);
DorisMetrics::instance()->alter_inverted_index_requests_total->increment(1);
uint64_t start = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/task/engine_index_change_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ class EngineIndexChangeTask : public EngineTask {

private:
const TAlterInvertedIndexReq& _alter_inverted_index_req;

std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // EngineTask

} // namespace doris
5 changes: 5 additions & 0 deletions be/src/olap/task/engine_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@

namespace doris {

class MemTrackerLimiter;

// base class for storage engine
// add "Engine" as task prefix to prevent duplicate name with agent task
class EngineTask {
public:
virtual ~EngineTask() = default;
virtual Status execute() { return Status::OK(); }
virtual Status finish() { return Status::OK(); }
std::shared_ptr<MemTrackerLimiter> mem_tracker() const { return _mem_tracker; }

std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};

} // end namespace doris
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class ThreadContext {
DCHECK(mem_tracker);
// Orphan is thread default tracker.
DCHECK(thread_mem_tracker()->label() == "Orphan")
<< ", thread mem tracker label: " << thread_mem_tracker()->label()
<< ", attach mem tracker label: " << mem_tracker->label();
#endif
_task_id = task_id;
Expand Down

0 comments on commit 2dca398

Please sign in to comment.