From 11971eddb49fe670c3ef11bcb77f407d3b97661d Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Fri, 17 May 2024 22:47:00 +0800 Subject: [PATCH] [atomicstatus](be) add atomic status to share state between multi thread (#35002) --- be/src/common/status.h | 44 +++++++++++++++++++ .../vec/sink/writer/async_result_writer.cpp | 16 +++---- be/src/vec/sink/writer/async_result_writer.h | 8 ++-- 3 files changed, 53 insertions(+), 15 deletions(-) diff --git a/be/src/common/status.h b/be/src/common/status.h index 39cb205ce896a3..cf9b42a3c69668 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -542,6 +542,50 @@ class [[nodiscard]] Status { } }; +// There are many thread using status to indicate the cancel state, one thread may update it and +// the other thread will read it. Status is not thread safe, for example, if one thread is update it +// and another thread is call to_string method, it may core, because the _err_msg is an unique ptr and +// it is deconstructed during copy method. +// And also we could not use lock, because we need get status frequently to check if it is cancelled. +// The defaule value is ok. +class AtomicStatus { +public: + AtomicStatus() : error_st_(Status::OK()) {} + + bool ok() const { return error_code_.load() == 0; } + + bool update(const Status& new_status) { + // If new status is normal, or the old status is abnormal, then not need update + if (new_status.ok() || error_code_.load() != 0) { + return false; + } + int16_t expected_error_code = 0; + if (error_code_.compare_exchange_strong(expected_error_code, new_status.code(), + std::memory_order_acq_rel)) { + // lock here for read status, to avoid core during return error_st_ + std::lock_guard l(mutex_); + error_st_ = new_status; + return true; + } else { + return false; + } + } + + // will copy a new status object to avoid concurrency + Status status() { + std::lock_guard l(mutex_); + return error_st_; + } + +private: + std::atomic_int16_t error_code_ = 0; + Status error_st_; + std::mutex mutex_; + + AtomicStatus(const AtomicStatus&) = delete; + void operator=(const AtomicStatus&) = delete; +}; + inline std::ostream& operator<<(std::ostream& ostr, const Status& status) { ostr << '[' << status.code_as_string() << ']'; ostr << status.msg(); diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 9a84f374464a10..4ed878a4634476 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -55,7 +55,7 @@ Status AsyncResultWriter::sink(Block* block, bool eos) { // if io task failed, just return error status to // end the query if (!_writer_status.ok()) { - return _writer_status; + return _writer_status.status(); } if (_dependency && _is_finished()) { @@ -143,7 +143,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi auto status = write(*block); if (!status.ok()) [[unlikely]] { std::unique_lock l(_m); - _writer_status = status; + _writer_status.update(status); if (_dependency && _is_finished()) { _dependency->set_ready(); } @@ -172,14 +172,10 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi // Should not call finish in lock because it may hang, and it will lock _m too long. // And get_writer_status will also need this lock, it will block pipeline exec thread. Status st = finish(state); - std::lock_guard l(_m); - _writer_status = st; + _writer_status.update(st); } Status st = Status::OK(); - { - std::lock_guard l(_m); - st = _writer_status; - } + { st = _writer_status.status(); } Status close_st = close(st); { @@ -187,7 +183,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi // the real reason. std::lock_guard l(_m); if (_writer_status.ok()) { - _writer_status = close_st; + _writer_status.update(close_st); } _writer_thread_closed = true; } @@ -215,7 +211,7 @@ Status AsyncResultWriter::_projection_block(doris::vectorized::Block& input_bloc void AsyncResultWriter::force_close(Status s) { std::lock_guard l(_m); - _writer_status = s; + _writer_status.update(s); if (_dependency && _is_finished()) { _dependency->set_ready(); } diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index 7f9700486da80b..1fd0fc280f0087 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -78,10 +78,7 @@ class AsyncResultWriter : public ResultWriter { // Add the IO thread task process block() to thread pool to dispose the IO Status start_writer(RuntimeState* state, RuntimeProfile* profile); - Status get_writer_status() { - std::lock_guard l(_m); - return _writer_status; - } + Status get_writer_status() { return _writer_status.status(); } protected: Status _projection_block(Block& input_block, Block* output_block); @@ -103,7 +100,8 @@ class AsyncResultWriter : public ResultWriter { std::mutex _m; std::condition_variable _cv; std::deque> _data_queue; - Status _writer_status = Status::OK(); + // Default value is ok + AtomicStatus _writer_status; bool _eos = false; // The writer is not started at the beginning. If prepare failed but not open, the the writer // is not started, so should not pending finish on it.