Skip to content

Commit

Permalink
up
Browse files Browse the repository at this point in the history
Signed-off-by: Jiajun Yao <[email protected]>
  • Loading branch information
jjyao committed Oct 11, 2024
1 parent e944542 commit b072f62
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 12 deletions.
6 changes: 4 additions & 2 deletions src/ray/rpc/gcs_server/gcs_rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ class GcsRpcClient {
.gcs_client_check_connection_status_interval_milliseconds(),
/*server_unavailable_timeout_seconds=*/
::RayConfig::instance().gcs_rpc_server_reconnect_timeout_s(),
/*server_unavailable_timeout_callback=*/[]() {
/*server_unavailable_timeout_callback=*/
[]() {
RAY_LOG(ERROR) << "Failed to connect to GCS within "
<< ::RayConfig::instance().gcs_rpc_server_reconnect_timeout_s()
<< " seconds. "
Expand All @@ -200,7 +201,8 @@ class GcsRpcClient {
"configure-logging.html#logging-directory-structure. "
<< "The program will terminate.";
std::_Exit(EXIT_FAILURE);
});
},
/*server_name=*/"GCS");
}

template <typename Service,
Expand Down
20 changes: 13 additions & 7 deletions src/ray/rpc/retryable_grpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <chrono>

#include "absl/container/btree_map.h"
#include "absl/strings/str_format.h"
#include "ray/common/grpc_util.h"
#include "ray/rpc/client_call.h"
#include "ray/rpc/grpc_client.h"
Expand Down Expand Up @@ -72,14 +73,16 @@ class RetryableGrpcClient : public std::enable_shared_from_this<RetryableGrpcCli
uint64_t max_pending_requests_bytes,
uint64_t check_channel_status_interval_milliseconds,
uint64_t server_unavailable_timeout_seconds,
std::function<void()> server_unavailable_timeout_callback) {
std::function<void()> server_unavailable_timeout_callback,
const std::string &server_name) {
auto retryable_grpc_client = std::shared_ptr<RetryableGrpcClient>(
new RetryableGrpcClient(channel,
io_context,
max_pending_requests_bytes,
check_channel_status_interval_milliseconds,
server_unavailable_timeout_seconds,
server_unavailable_timeout_callback));
server_unavailable_timeout_callback,
server_name));
// SetupCheckTimer() MUST be called after we have the shard_ptr of
// RetryableGrpcClient since it calls shared_from_this()
// which requires a shared_ptr of this to exist.
Expand Down Expand Up @@ -186,7 +189,8 @@ class RetryableGrpcClient : public std::enable_shared_from_this<RetryableGrpcCli
uint64_t max_pending_requests_bytes,
uint64_t check_channel_status_interval_milliseconds,
uint64_t server_unavailable_timeout_seconds,
std::function<void()> server_unavailable_timeout_callback)
std::function<void()> server_unavailable_timeout_callback,
const std::string &server_name)
: io_context_(io_context),
timer_(std::make_unique<boost::asio::deadline_timer>(io_context)),
channel_(channel),
Expand All @@ -195,7 +199,8 @@ class RetryableGrpcClient : public std::enable_shared_from_this<RetryableGrpcCli
check_channel_status_interval_milliseconds),
server_unavailable_timeout_seconds_(server_unavailable_timeout_seconds),
server_unavailable_timeout_callback_(
std::move(server_unavailable_timeout_callback)) {}
std::move(server_unavailable_timeout_callback)),
server_name_(server_name) {}

void SetupCheckTimer() {
auto duration =
Expand Down Expand Up @@ -232,8 +237,8 @@ class RetryableGrpcClient : public std::enable_shared_from_this<RetryableGrpcCli
break;
}
auto [executor, request_bytes] = iter->second;
executor->Abort(ray::Status::TimedOut(
"Timed out while waiting for server to become available."));
executor->Abort(ray::Status::TimedOut(absl::StrFormat(
"Timed out while waiting for %s to become available.", server_name_)));
pending_requests_bytes_ -= request_bytes;
delete executor;
pending_requests_.erase(iter);
Expand All @@ -248,7 +253,7 @@ class RetryableGrpcClient : public std::enable_shared_from_this<RetryableGrpcCli
uint64_t server_unavailable_duration_seconds = static_cast<uint64_t>(
absl::ToInt64Seconds(absl::Now() - server_last_available_time_));
if (server_unavailable_duration_seconds >= server_unavailable_timeout_seconds_) {
RAY_LOG(WARNING) << "Server has been unavailable for "
RAY_LOG(WARNING) << server_name_ << " has been unavailable for "
<< server_unavailable_duration_seconds << " seconds";
server_unavailable_timeout_callback_();
}
Expand Down Expand Up @@ -287,6 +292,7 @@ class RetryableGrpcClient : public std::enable_shared_from_this<RetryableGrpcCli
uint64_t check_channel_status_interval_milliseconds_;
uint64_t server_unavailable_timeout_seconds_;
std::function<void()> server_unavailable_timeout_callback_;
std::string server_name_;

bool server_is_unavailable_ = false;
absl::Time server_last_available_time_ = absl::Now();
Expand Down
6 changes: 3 additions & 3 deletions src/ray/rpc/worker/core_worker_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ class CoreWorkerClient : public std::enable_shared_from_this<CoreWorkerClient>,
.gcs_client_check_connection_status_interval_milliseconds(),
/*server_unavailable_timeout_seconds=*/
std::numeric_limits<uint64_t>::max(),
/*server_unavailable_timeout_callback=*/[]() {
RAY_LOG(FATAL) << "Server unavailable should never timeout";
});
/*server_unavailable_timeout_callback=*/
[]() { RAY_LOG(FATAL) << "Server unavailable should never timeout"; },
/*server_name=*/"Core worker " + addr_.ip_address());
};

const rpc::Address &Addr() const override { return addr_; }
Expand Down

0 comments on commit b072f62

Please sign in to comment.