From d5fa9a04ed841ea845887f43e06a0d2a81216c2d Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 16 Oct 2024 17:29:55 -0700 Subject: [PATCH] [Core][RFC] RPC network error chaos test framework (#48007) Signed-off-by: Jiajun Yao --- BUILD.bazel | 15 +++ python/ray/tests/test_gcs_utils.py | 14 +++ src/ray/common/ray_config_def.h | 5 + src/ray/core_worker/core_worker_process.cc | 1 + src/ray/gcs/gcs_server/gcs_server_main.cc | 1 + src/ray/raylet/main.cc | 1 + src/ray/rpc/grpc_client.h | 52 +++++++--- src/ray/rpc/rpc_chaos.cc | 109 +++++++++++++++++++++ src/ray/rpc/rpc_chaos.h | 37 +++++++ src/ray/rpc/test/rpc_chaos_test.cc | 34 +++++++ 10 files changed, 258 insertions(+), 11 deletions(-) create mode 100644 src/ray/rpc/rpc_chaos.cc create mode 100644 src/ray/rpc/rpc_chaos.h create mode 100644 src/ray/rpc/test/rpc_chaos_test.cc diff --git a/BUILD.bazel b/BUILD.bazel index 83660e6aa8cad..7f497e3f6de1f 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -93,8 +93,10 @@ ray_cc_library( "src/ray/rpc/common.cc", "src/ray/rpc/grpc_server.cc", "src/ray/rpc/server_call.cc", + "src/ray/rpc/rpc_chaos.cc", ], hdrs = glob([ + "src/ray/rpc/rpc_chaos.h", "src/ray/rpc/client_call.h", "src/ray/rpc/common.h", "src/ray/rpc/grpc_client.h", @@ -1551,6 +1553,19 @@ ray_cc_test( ], ) +ray_cc_test( + name = "rpc_chaos_test", + size = "small", + srcs = [ + "src/ray/rpc/test/rpc_chaos_test.cc", + ], + tags = ["team:core"], + deps = [ + ":grpc_common_lib", + "@com_google_googletest//:gtest_main", + ], +) + ray_cc_test( name = "core_worker_client_pool_test", size = "small", diff --git a/python/ray/tests/test_gcs_utils.py b/python/ray/tests/test_gcs_utils.py index 82f34214046ef..c25beac6e598a 100644 --- a/python/ray/tests/test_gcs_utils.py +++ b/python/ray/tests/test_gcs_utils.py @@ -100,6 +100,20 @@ def test_kv_timeout(ray_start_regular): gcs_client.internal_kv_del(b"A", True, b"NS", timeout=2) +def test_kv_transient_network_error(shutdown_only, monkeypatch): + monkeypatch.setenv( + "RAY_testing_rpc_failure", + "ray::rpc::InternalKVGcsService.grpc_client.InternalKVGet=5," + "ray::rpc::InternalKVGcsService.grpc_client.InternalKVPut=5", + ) + ray.init() + gcs_address = ray._private.worker.global_worker.gcs_client.address + gcs_client = ray._raylet.GcsClient(address=gcs_address, nums_reconnect_retry=0) + + gcs_client.internal_kv_put(b"A", b"Hello", True, b"") + assert gcs_client.internal_kv_get(b"A", b"") == b"Hello" + + @pytest.mark.asyncio async def test_kv_basic_aio(ray_start_regular): gcs_client = gcs_utils.GcsAioClient( diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index bcc960158a078..674895d4b3892 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -835,6 +835,11 @@ RAY_CONFIG(std::string, REDIS_SERVER_NAME, "") // it will apply to all methods. RAY_CONFIG(std::string, testing_asio_delay_us, "") +/// To use this, simply do +/// export +/// RAY_testing_rpc_failure="method1=max_num_failures,method2=max_num_failures" +RAY_CONFIG(std::string, testing_rpc_failure, "") + /// The following are configs for the health check. They are borrowed /// from k8s health probe (shorturl.at/jmTY3) /// The delay to send the first health check. diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index 191788e7e0458..30042635dee7c 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -238,6 +238,7 @@ void CoreWorkerProcessImpl::InitializeSystemConfig() { RayConfig::instance().initialize(promise.get_future().get()); ray::asio::testing::init(); + ray::rpc::testing::init(); } void CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop() { diff --git a/src/ray/gcs/gcs_server/gcs_server_main.cc b/src/ray/gcs/gcs_server/gcs_server_main.cc index 137efbaf9dd5d..18d7b83d896e4 100644 --- a/src/ray/gcs/gcs_server/gcs_server_main.cc +++ b/src/ray/gcs/gcs_server/gcs_server_main.cc @@ -70,6 +70,7 @@ int main(int argc, char *argv[]) { RayConfig::instance().initialize(config_list); ray::asio::testing::init(); + ray::rpc::testing::init(); // IO Service for main loop. instrumented_io_context main_service; diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 8cc0c1f08ef1a..2b30f9068b6b6 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -275,6 +275,7 @@ int main(int argc, char *argv[]) { RAY_CHECK(stored_raylet_config.has_value()); RayConfig::instance().initialize(*stored_raylet_config); ray::asio::testing::init(); + ray::rpc::testing::init(); // Core worker tries to kill child processes when it exits. But they can't do // it perfectly: if the core worker is killed by SIGKILL, the child processes diff --git a/src/ray/rpc/grpc_client.h b/src/ray/rpc/grpc_client.h index 9037613968ceb..d86475fbdf6b5 100644 --- a/src/ray/rpc/grpc_client.h +++ b/src/ray/rpc/grpc_client.h @@ -23,6 +23,7 @@ #include "ray/common/status.h" #include "ray/rpc/client_call.h" #include "ray/rpc/common.h" +#include "ray/rpc/rpc_chaos.h" namespace ray { namespace rpc { @@ -148,15 +149,43 @@ class GrpcClient { const ClientCallback &callback, std::string call_name = "UNKNOWN_RPC", int64_t method_timeout_ms = -1) { - auto call = client_call_manager_.CreateCall( - *stub_, - prepare_async_function, - request, - callback, - std::move(call_name), - method_timeout_ms); - RAY_CHECK(call != nullptr); - call_method_invoked_ = true; + testing::RpcFailure failure = testing::get_rpc_failure(call_name); + if (failure == testing::RpcFailure::Request) { + // Simulate the case where the PRC fails before server receives + // the request. + RAY_LOG(INFO) << "Inject RPC request failure for " << call_name; + client_call_manager_.GetMainService().post( + [callback]() { + callback(Status::RpcError("Unavailable", grpc::StatusCode::UNAVAILABLE), + Reply()); + }, + "RpcChaos"); + } else if (failure == testing::RpcFailure::Response) { + // Simulate the case where the RPC fails after server sends + // the response. + RAY_LOG(INFO) << "Inject RPC response failure for " << call_name; + client_call_manager_.CreateCall( + *stub_, + prepare_async_function, + request, + [callback](const Status &status, Reply &&reply) { + callback(Status::RpcError("Unavailable", grpc::StatusCode::UNAVAILABLE), + Reply()); + }, + std::move(call_name), + method_timeout_ms); + } else { + auto call = client_call_manager_.CreateCall( + *stub_, + prepare_async_function, + request, + callback, + std::move(call_name), + method_timeout_ms); + RAY_CHECK(call != nullptr); + } + + call_method_invoked_.store(true); } std::shared_ptr Channel() const { return channel_; } @@ -167,7 +196,8 @@ class GrpcClient { /// Also see https://grpc.github.io/grpc/core/md_doc_connectivity-semantics-and-api.html /// for channel connectivity state machine. bool IsChannelIdleAfterRPCs() const { - return (channel_->GetState(false) == GRPC_CHANNEL_IDLE) && call_method_invoked_; + return (channel_->GetState(false) == GRPC_CHANNEL_IDLE) && + call_method_invoked_.load(); } private: @@ -179,7 +209,7 @@ class GrpcClient { /// The channel of the stub. std::shared_ptr channel_; /// Whether CallMethod is invoked. - bool call_method_invoked_ = false; + std::atomic call_method_invoked_ = false; }; } // namespace rpc diff --git a/src/ray/rpc/rpc_chaos.cc b/src/ray/rpc/rpc_chaos.cc new file mode 100644 index 0000000000000..373e3a9be60f0 --- /dev/null +++ b/src/ray/rpc/rpc_chaos.cc @@ -0,0 +1,109 @@ +// Copyright 2024 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/rpc/rpc_chaos.h" + +#include +#include + +#include "absl/synchronization/mutex.h" +#include "ray/common/ray_config.h" + +namespace ray { +namespace rpc { +namespace testing { +namespace { + +/* + RpcFailureManager is a simple chaos testing framework. Before starting ray, users + should set up os environment to use this feature for testing purposes. + To use this, simply do + export RAY_testing_rpc_failure="method1=3,method2=5" + Key is the RPC call name and value is the max number of failures to inject. +*/ +class RpcFailureManager { + public: + RpcFailureManager() { Init(); } + + void Init() { + absl::MutexLock lock(&mu_); + + failable_methods_.clear(); + + if (!RayConfig::instance().testing_rpc_failure().empty()) { + for (const auto &item : + absl::StrSplit(RayConfig::instance().testing_rpc_failure(), ",")) { + std::vector parts = absl::StrSplit(item, "="); + RAY_CHECK_EQ(parts.size(), 2UL); + failable_methods_.emplace(parts[0], std::atoi(parts[1].c_str())); + } + + std::random_device rd; + auto seed = rd(); + RAY_LOG(INFO) << "Setting RpcFailureManager seed to " << seed; + gen_.seed(seed); + } + } + + RpcFailure GetRpcFailure(const std::string &name) { + absl::MutexLock lock(&mu_); + + if (failable_methods_.find(name) == failable_methods_.end()) { + return RpcFailure::None; + } + + uint64_t &num_remaining_failures = failable_methods_.at(name); + if (num_remaining_failures == 0) { + return RpcFailure::None; + } + + std::uniform_int_distribution dist(0, 3); + int rand = dist(gen_); + if (rand == 0) { + // 25% chance + num_remaining_failures--; + return RpcFailure::Request; + } else if (rand == 1) { + // 25% chance + num_remaining_failures--; + return RpcFailure::Response; + } else { + // 50% chance + return RpcFailure::None; + } + } + + private: + absl::Mutex mu_; + std::mt19937 gen_; + // call name -> # remaining failures + std::unordered_map failable_methods_ ABSL_GUARDED_BY(&mu_); +}; + +static RpcFailureManager _rpc_failure_manager; + +} // namespace + +RpcFailure get_rpc_failure(const std::string &name) { + if (RayConfig::instance().testing_rpc_failure().empty()) { + return RpcFailure::None; + } + return _rpc_failure_manager.GetRpcFailure(name); +} + +void init() { _rpc_failure_manager.Init(); } + +} // namespace testing +} // namespace rpc +} // namespace ray diff --git a/src/ray/rpc/rpc_chaos.h b/src/ray/rpc/rpc_chaos.h new file mode 100644 index 0000000000000..cb0e614eead9f --- /dev/null +++ b/src/ray/rpc/rpc_chaos.h @@ -0,0 +1,37 @@ +// Copyright 2024 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace ray { +namespace rpc { +namespace testing { + +enum class RpcFailure { + None, + // Failure before server receives the request + Request, + // Failure after server sends the response + Response, +}; + +RpcFailure get_rpc_failure(const std::string &name); + +void init(); + +} // namespace testing +} // namespace rpc +} // namespace ray diff --git a/src/ray/rpc/test/rpc_chaos_test.cc b/src/ray/rpc/test/rpc_chaos_test.cc new file mode 100644 index 0000000000000..75bced2592537 --- /dev/null +++ b/src/ray/rpc/test/rpc_chaos_test.cc @@ -0,0 +1,34 @@ +// Copyright 2024 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/rpc/rpc_chaos.h" + +#include + +#include "gtest/gtest.h" +#include "ray/common/ray_config.h" + +TEST(RpcChaosTest, Basic) { + RayConfig::instance().testing_rpc_failure() = "method1=0,method2=1"; + ray::rpc::testing::init(); + ASSERT_EQ(ray::rpc::testing::get_rpc_failure("unknown"), + ray::rpc::testing::RpcFailure::None); + ASSERT_EQ(ray::rpc::testing::get_rpc_failure("method1"), + ray::rpc::testing::RpcFailure::None); + // At most one failure. + ASSERT_FALSE(ray::rpc::testing::get_rpc_failure("method2") != + ray::rpc::testing::RpcFailure::None && + ray::rpc::testing::get_rpc_failure("method2") != + ray::rpc::testing::RpcFailure::None); +}