Skip to content

Commit

Permalink
[fix](move-memtable) add timeout for load stream close wait (#27439)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored Nov 30, 2023
1 parent 34e53ac commit 112ae59
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 7 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,8 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
DEFINE_Bool(share_delta_writers, "true");
// timeout for open load stream rpc in ms
DEFINE_Int64(open_load_stream_timeout_ms, "500");
// timeout for load stream close wait in ms
DEFINE_Int64(close_load_stream_timeout_ms, "600000"); // 10 min

// idle timeout for load stream in ms
DEFINE_Int64(load_stream_idle_timeout_ms, "600000");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,8 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
DECLARE_Bool(share_delta_writers);
// timeout for open load stream rpc in ms
DECLARE_Int64(open_load_stream_timeout_ms);
// timeout for load stream close wait in ms
DECLARE_Int64(close_load_stream_timeout_ms);

// idle timeout for load stream in ms
DECLARE_Int64(load_stream_idle_timeout_ms);
Expand Down
13 changes: 6 additions & 7 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,13 @@ class LoadStreamStub {
bool is_closed() { return _is_closed.load(); }

Status close_wait(int64_t timeout_ms) {
DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
std::unique_lock<bthread::Mutex> lock(_mutex);
if (_is_closed) {
return Status::OK();
}
if (timeout_ms > 0) {
int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
return ret == 0 ? Status::OK()
: Status::Error<true>(ret, "stream close_wait timeout");
}
_close_cv.wait(lock);
return Status::OK();
int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
return ret == 0 ? Status::OK() : Status::Error<true>(ret, "stream close_wait timeout");
};

std::vector<int64_t> success_tablets() {
Expand Down Expand Up @@ -184,6 +180,9 @@ class LoadStreamStub {
if (!_is_init.load() || _handler.is_closed()) {
return Status::OK();
}
if (timeout_ms <= 0) {
timeout_ms = config::close_load_stream_timeout_ms;
}
return _handler.close_wait(timeout_ms);
}

Expand Down

0 comments on commit 112ae59

Please sign in to comment.