diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index cccb1e2033e425..2c1080ec0ad858 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -748,6 +748,8 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1"); DEFINE_Bool(share_delta_writers, "true"); // timeout for open load stream rpc in ms DEFINE_Int64(open_load_stream_timeout_ms, "500"); +// timeout for load stream close wait in ms +DEFINE_Int64(close_load_stream_timeout_ms, "600000"); // 10 min // idle timeout for load stream in ms DEFINE_Int64(load_stream_idle_timeout_ms, "600000"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 4a1c07149afd2d..ddc47c678c64a3 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -809,6 +809,8 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio); DECLARE_Bool(share_delta_writers); // timeout for open load stream rpc in ms DECLARE_Int64(open_load_stream_timeout_ms); +// timeout for load stream close wait in ms +DECLARE_Int64(close_load_stream_timeout_ms); // idle timeout for load stream in ms DECLARE_Int64(load_stream_idle_timeout_ms); diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index b8c3be255c97c5..aca2e9ea5508ba 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -99,17 +99,13 @@ class LoadStreamStub { bool is_closed() { return _is_closed.load(); } Status close_wait(int64_t timeout_ms) { + DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0"; std::unique_lock lock(_mutex); if (_is_closed) { return Status::OK(); } - if (timeout_ms > 0) { - int ret = _close_cv.wait_for(lock, timeout_ms * 1000); - return ret == 0 ? Status::OK() - : Status::Error(ret, "stream close_wait timeout"); - } - _close_cv.wait(lock); - return Status::OK(); + int ret = _close_cv.wait_for(lock, timeout_ms * 1000); + return ret == 0 ? Status::OK() : Status::Error(ret, "stream close_wait timeout"); }; std::vector success_tablets() { @@ -184,6 +180,9 @@ class LoadStreamStub { if (!_is_init.load() || _handler.is_closed()) { return Status::OK(); } + if (timeout_ms <= 0) { + timeout_ms = config::close_load_stream_timeout_ms; + } return _handler.close_wait(timeout_ms); }