Skip to content

Commit

Permalink
[atomicstatus](be) add atomic status to share state between multi thr…
Browse files Browse the repository at this point in the history
…ead (#35002)
  • Loading branch information
yiguolei authored and Doris-Extras committed May 21, 2024
1 parent 9bc2c88 commit 11971ed
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 15 deletions.
44 changes: 44 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,50 @@ class [[nodiscard]] Status {
}
};

// There are many thread using status to indicate the cancel state, one thread may update it and
// the other thread will read it. Status is not thread safe, for example, if one thread is update it
// and another thread is call to_string method, it may core, because the _err_msg is an unique ptr and
// it is deconstructed during copy method.
// And also we could not use lock, because we need get status frequently to check if it is cancelled.
// The defaule value is ok.
class AtomicStatus {
public:
AtomicStatus() : error_st_(Status::OK()) {}

bool ok() const { return error_code_.load() == 0; }

bool update(const Status& new_status) {
// If new status is normal, or the old status is abnormal, then not need update
if (new_status.ok() || error_code_.load() != 0) {
return false;
}
int16_t expected_error_code = 0;
if (error_code_.compare_exchange_strong(expected_error_code, new_status.code(),
std::memory_order_acq_rel)) {
// lock here for read status, to avoid core during return error_st_
std::lock_guard l(mutex_);
error_st_ = new_status;
return true;
} else {
return false;
}
}

// will copy a new status object to avoid concurrency
Status status() {
std::lock_guard l(mutex_);
return error_st_;
}

private:
std::atomic_int16_t error_code_ = 0;
Status error_st_;
std::mutex mutex_;

AtomicStatus(const AtomicStatus&) = delete;
void operator=(const AtomicStatus&) = delete;
};

inline std::ostream& operator<<(std::ostream& ostr, const Status& status) {
ostr << '[' << status.code_as_string() << ']';
ostr << status.msg();
Expand Down
16 changes: 6 additions & 10 deletions be/src/vec/sink/writer/async_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Status AsyncResultWriter::sink(Block* block, bool eos) {
// if io task failed, just return error status to
// end the query
if (!_writer_status.ok()) {
return _writer_status;
return _writer_status.status();
}

if (_dependency && _is_finished()) {
Expand Down Expand Up @@ -143,7 +143,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi
auto status = write(*block);
if (!status.ok()) [[unlikely]] {
std::unique_lock l(_m);
_writer_status = status;
_writer_status.update(status);
if (_dependency && _is_finished()) {
_dependency->set_ready();
}
Expand Down Expand Up @@ -172,22 +172,18 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi
// Should not call finish in lock because it may hang, and it will lock _m too long.
// And get_writer_status will also need this lock, it will block pipeline exec thread.
Status st = finish(state);
std::lock_guard l(_m);
_writer_status = st;
_writer_status.update(st);
}
Status st = Status::OK();
{
std::lock_guard l(_m);
st = _writer_status;
}
{ st = _writer_status.status(); }

Status close_st = close(st);
{
// If it is already failed before, then not update the write status so that we could get
// the real reason.
std::lock_guard l(_m);
if (_writer_status.ok()) {
_writer_status = close_st;
_writer_status.update(close_st);
}
_writer_thread_closed = true;
}
Expand Down Expand Up @@ -215,7 +211,7 @@ Status AsyncResultWriter::_projection_block(doris::vectorized::Block& input_bloc

void AsyncResultWriter::force_close(Status s) {
std::lock_guard l(_m);
_writer_status = s;
_writer_status.update(s);
if (_dependency && _is_finished()) {
_dependency->set_ready();
}
Expand Down
8 changes: 3 additions & 5 deletions be/src/vec/sink/writer/async_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,7 @@ class AsyncResultWriter : public ResultWriter {
// Add the IO thread task process block() to thread pool to dispose the IO
Status start_writer(RuntimeState* state, RuntimeProfile* profile);

Status get_writer_status() {
std::lock_guard l(_m);
return _writer_status;
}
Status get_writer_status() { return _writer_status.status(); }

protected:
Status _projection_block(Block& input_block, Block* output_block);
Expand All @@ -103,7 +100,8 @@ class AsyncResultWriter : public ResultWriter {
std::mutex _m;
std::condition_variable _cv;
std::deque<std::unique_ptr<Block>> _data_queue;
Status _writer_status = Status::OK();
// Default value is ok
AtomicStatus _writer_status;
bool _eos = false;
// The writer is not started at the beginning. If prepare failed but not open, the the writer
// is not started, so should not pending finish on it.
Expand Down

0 comments on commit 11971ed

Please sign in to comment.