Skip to content

Commit

Permalink
up
Browse files Browse the repository at this point in the history
Signed-off-by: Jiajun Yao <[email protected]>
  • Loading branch information
jjyao committed Oct 17, 2024
1 parent 8ea5340 commit bf8577e
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 18 deletions.
4 changes: 4 additions & 0 deletions python/ray/tests/test_streaming_generator_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def test_ray_datasetlike_mini_stress_test(
"RAY_testing_asio_delay_us",
"CoreWorkerService.grpc_server.ReportGeneratorItemReturns=10000:1000000",
)
m.setenv(
"RAY_testing_rpc_failure",
"ray::rpc::CoreWorkerService.grpc_client.ReportGeneratorItemReturns=5",
)
cluster = ray_start_cluster
cluster.add_node(
num_cpus=1,
Expand Down
2 changes: 1 addition & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ RAY_CONFIG(int32_t, gcs_grpc_initial_reconnect_backoff_ms, 100)
RAY_CONFIG(uint64_t, gcs_grpc_max_request_queued_max_bytes, 1024UL * 1024 * 1024 * 5)

/// The duration between two checks for grpc status.
RAY_CONFIG(int32_t, gcs_client_check_connection_status_interval_milliseconds, 1000)
RAY_CONFIG(int32_t, grpc_client_check_connection_status_interval_milliseconds, 1000)

/// Due to the protocol drawback, raylet needs to refresh the message if
/// no message is received for a while.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/rpc/gcs_server/gcs_rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class GcsRpcClient {
::RayConfig::instance().gcs_grpc_max_request_queued_max_bytes(),
/*check_channel_status_interval_milliseconds=*/
::RayConfig::instance()
.gcs_client_check_connection_status_interval_milliseconds(),
.grpc_client_check_connection_status_interval_milliseconds(),
/*server_unavailable_timeout_seconds=*/
::RayConfig::instance().gcs_rpc_server_reconnect_timeout_s(),
/*server_unavailable_timeout_callback=*/
Expand Down
33 changes: 29 additions & 4 deletions src/ray/rpc/retryable_grpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ namespace rpc {
callback, \
method_timeout_ms))

// Define a void retryable RPC client method.
#define VOID_RETRYABLE_RPC_CLIENT_METHOD( \
retryable_rpc_client, SERVICE, METHOD, rpc_client, method_timeout_ms, SPECS) \
void METHOD(const METHOD##Request &request, \
const ClientCallback<METHOD##Reply> &callback) SPECS { \
INVOKE_RETRYABLE_RPC_CALL(retryable_rpc_client, \
SERVICE, \
METHOD, \
request, \
callback, \
rpc_client, \
method_timeout_ms); \
}

/// \class Executor
/// Executor saves operation and support retries.
class Executor {
Expand All @@ -65,6 +79,17 @@ class Executor {
std::function<void()> operation_;
};

/**
* The client makes RPC calls through the provided underlying grpc client.
* If the call goes through, the user provided callback is invoked.
* If the call fails due to transient network error, it is added to a retry queue.
* The client waits for the grpc channel reconnection to resend the requests.
* If the total number of request bytes in the queue exceeds max_pending_requests_bytes,
* the thread is blocked until some requests are resent.
* If a call's timeout_ms reaches during retry, its callback is called with
* Status::TimedOut. If the whole client does not reconnect within
* server_unavailable_timeout_seconds, server_unavailable_timeout_callback is invoked.
*/
class RetryableGrpcClient : public std::enable_shared_from_this<RetryableGrpcClient> {
public:
static std::shared_ptr<RetryableGrpcClient> Create(
Expand Down Expand Up @@ -288,11 +313,11 @@ class RetryableGrpcClient : public std::enable_shared_from_this<RetryableGrpcCli

std::shared_ptr<grpc::Channel> channel_;

uint64_t max_pending_requests_bytes_;
uint64_t check_channel_status_interval_milliseconds_;
uint64_t server_unavailable_timeout_seconds_;
const uint64_t max_pending_requests_bytes_;
const uint64_t check_channel_status_interval_milliseconds_;
const uint64_t server_unavailable_timeout_seconds_;
std::function<void()> server_unavailable_timeout_callback_;
std::string server_name_;
const std::string server_name_;

bool server_is_unavailable_ = false;
absl::Time server_last_available_time_ = absl::Now();
Expand Down
19 changes: 7 additions & 12 deletions src/ray/rpc/worker/core_worker_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class CoreWorkerClient : public std::enable_shared_from_this<CoreWorkerClient>,
std::numeric_limits<uint64_t>::max(),
/*check_channel_status_interval_milliseconds=*/
::RayConfig::instance()
.gcs_client_check_connection_status_interval_milliseconds(),
.grpc_client_check_connection_status_interval_milliseconds(),
/*server_unavailable_timeout_seconds=*/
std::numeric_limits<uint64_t>::max(),
/*server_unavailable_timeout_callback=*/
Expand Down Expand Up @@ -294,17 +294,12 @@ class CoreWorkerClient : public std::enable_shared_from_this<CoreWorkerClient>,
/*method_timeout_ms*/ -1,
override)

void ReportGeneratorItemReturns(
const ReportGeneratorItemReturnsRequest &request,
const ClientCallback<ReportGeneratorItemReturnsReply> &callback) override {
INVOKE_RETRYABLE_RPC_CALL(retryable_grpc_client_,
CoreWorkerService,
ReportGeneratorItemReturns,
request,
callback,
*grpc_client_,
/*method_timeout_ms*/ -1);
}
VOID_RETRYABLE_RPC_CLIENT_METHOD(retryable_grpc_client_,
CoreWorkerService,
ReportGeneratorItemReturns,
*grpc_client_,
/*method_timeout_ms*/ -1,
override)

VOID_RPC_CLIENT_METHOD(CoreWorkerService,
RegisterMutableObjectReader,
Expand Down

0 comments on commit bf8577e

Please sign in to comment.