diff --git a/docs/cn/server.md b/docs/cn/server.md index 5e3fc42a6f..c9d56debad 100644 --- a/docs/cn/server.md +++ b/docs/cn/server.md @@ -1027,7 +1027,9 @@ Server默认使用`DefaultRpcPBMessageFactory`。它是一个简单的工厂类 class RpcPBMessages { public: virtual ~RpcPBMessages() = default; + // Get protobuf request message. virtual google::protobuf::Message* Request() = 0; + // Get protobuf response message. virtual google::protobuf::Message* Response() = 0; }; @@ -1035,12 +1037,21 @@ public: class RpcPBMessageFactory { public: virtual ~RpcPBMessageFactory() = default; + + // Get `RpcPBMessages' according to `service' and `method'. + // Common practice to create protobuf message: + // service.GetRequestPrototype(&method).New() -> request; + // service.GetRequestPrototype(&method).New() -> response. virtual RpcPBMessages* Get(const ::google::protobuf::Service& service, const ::google::protobuf::MethodDescriptor& method) = 0; - virtual void Return(RpcPBMessages* protobuf_message) = 0; + // Return `RpcPBMessages' to factory. + virtual void Return(RpcPBMessages* messages) = 0; }; ``` +### Protobuf arena + + # FAQ ### Q: Fail to write into fd=1865 SocketId=8905@10.208.245.43:54742@8230: Got EOF是什么意思 diff --git a/docs/en/server.md b/docs/en/server.md index 62480d2801..2469f4e4ba 100644 --- a/docs/en/server.md +++ b/docs/en/server.md @@ -1008,6 +1008,41 @@ public: ... ``` +## RPC Protobuf message factory + +`DefaultRpcPBMessageFactory' is used at server-side by default. It is a simple factory class that uses `new' to create request/response messages and `delete' to destroy request/response messages. + +Users can implement `RpcPBMessages' (encapsulation of request/response message) and `RpcPBMessageFactory' (factory class) to customize the creation and destruction mechanism of protobuf message, and then set to `ServerOptions.rpc_pb_message_factory'. + +The interface is as follows: + +```c++ +// Inherit this class to customize rpc protobuf messages, +// include request and response. +class RpcPBMessages { +public: + virtual ~RpcPBMessages() = default; + // Get protobuf request message. + virtual google::protobuf::Message* Request() = 0; + // Get protobuf response message. + virtual google::protobuf::Message* Response() = 0; +}; + +// Factory to manage `RpcPBMessages'. +class RpcPBMessageFactory { +public: + virtual ~RpcPBMessageFactory() = default; + // Get `RpcPBMessages' according to `service' and `method'. + // Common practice to create protobuf message: + // service.GetRequestPrototype(&method).New() -> request; + // service.GetRequestPrototype(&method).New() -> response. + virtual RpcPBMessages* Get(const ::google::protobuf::Service& service, + const ::google::protobuf::MethodDescriptor& method) = 0; + // Return `RpcPBMessages' to factory. + virtual void Return(RpcPBMessages* protobuf_message) = 0; +}; +``` + # FAQ ### Q: Fail to write into fd=1865 SocketId=8905@10.208.245.43:54742@8230: Got EOF diff --git a/src/brpc/rpc_pb_message_factory.cpp b/src/brpc/rpc_pb_message_factory.cpp index 828a289d8d..27e2457e05 100644 --- a/src/brpc/rpc_pb_message_factory.cpp +++ b/src/brpc/rpc_pb_message_factory.cpp @@ -16,7 +16,6 @@ // under the License. #include "brpc/rpc_pb_message_factory.h" -#include "butil/object_pool.h" namespace brpc { diff --git a/src/brpc/rpc_pb_message_factory.h b/src/brpc/rpc_pb_message_factory.h index 0da0ff2a67..22d39671ed 100644 --- a/src/brpc/rpc_pb_message_factory.h +++ b/src/brpc/rpc_pb_message_factory.h @@ -21,6 +21,8 @@ #include #include #include +#include +#include "butil/object_pool.h" namespace brpc { @@ -29,7 +31,9 @@ namespace brpc { class RpcPBMessages { public: virtual ~RpcPBMessages() = default; + // Get protobuf request message. virtual google::protobuf::Message* Request() = 0; + // Get protobuf response message. virtual google::protobuf::Message* Response() = 0; }; @@ -37,9 +41,14 @@ class RpcPBMessages { class RpcPBMessageFactory { public: virtual ~RpcPBMessageFactory() = default; + // Get `RpcPBMessages' according to `service' and `method'. + // Common practice to create protobuf message: + // service.GetRequestPrototype(&method).New() -> request; + // service.GetRequestPrototype(&method).New() -> response. virtual RpcPBMessages* Get(const ::google::protobuf::Service& service, const ::google::protobuf::MethodDescriptor& method) = 0; - virtual void Return(RpcPBMessages* protobuf_message) = 0; + // Return `RpcPBMessages' to factory. + virtual void Return(RpcPBMessages* messages) = 0; }; class DefaultRpcPBMessageFactory : public RpcPBMessageFactory { @@ -49,6 +58,80 @@ class DefaultRpcPBMessageFactory : public RpcPBMessageFactory { void Return(RpcPBMessages* messages) override; }; +namespace internal { + +// Allocate protobuf message from arena. +// The arena is created with `StartBlockSize' and `MaxBlockSize' options. +// For more details, see `google::protobuf::ArenaOptions'. +template +struct ArenaRpcPBMessages : public RpcPBMessages { + struct ArenaOptionsWrapper { + public: + ArenaOptionsWrapper() { + options.start_block_size = StartBlockSize; + options.max_block_size = MaxBlockSize; + } + + private: + friend struct ArenaRpcPBMessages; + ::google::protobuf::ArenaOptions options; + }; + + explicit ArenaRpcPBMessages(ArenaOptionsWrapper options_wrapper) + : arena(options_wrapper.options) + , request(NULL) + , response(NULL) {} + + ::google::protobuf::Message* Request() override { return request; } + ::google::protobuf::Message* Response() override { return response; } + + ::google::protobuf::Arena arena; + ::google::protobuf::Message* request; + ::google::protobuf::Message* response; +}; + +template +class ArenaRpcPBMessageFactory : public RpcPBMessageFactory { + typedef ::brpc::internal::ArenaRpcPBMessages + ArenaRpcPBMessages; +public: + ArenaRpcPBMessageFactory() { + _arena_options.start_block_size = StartBlockSize; + _arena_options.max_block_size = MaxBlockSize; + } + + RpcPBMessages* Get(const ::google::protobuf::Service& service, + const ::google::protobuf::MethodDescriptor& method) override { + typename ArenaRpcPBMessages::ArenaOptionsWrapper options_wrapper; + auto messages = butil::get_object(options_wrapper); + messages->request = service.GetRequestPrototype(&method).New(&messages->arena); + messages->response = service.GetResponsePrototype(&method).New(&messages->arena); + return messages; + } + + void Return(RpcPBMessages* messages) override { + auto arena_messages = static_cast(messages); + arena_messages->request = NULL; + arena_messages->response = NULL; + butil::return_object(arena_messages); + } + +private: + ::google::protobuf::ArenaOptions _arena_options; +}; + +} + +template +RpcPBMessageFactory* GetArenaRpcPBMessageFactory() { + return new ::brpc::internal::ArenaRpcPBMessageFactory(); +} + +BUTIL_FORCE_INLINE RpcPBMessageFactory* GetArenaRpcPBMessageFactory() { + // Default arena options, same as `google::protobuf::ArenaOptions'. + return GetArenaRpcPBMessageFactory<256, 8192>(); +} + } // namespace brpc -#endif //BRPC_RPC_PB_MESSAGE_FACTORY_H +#endif // BRPC_RPC_PB_MESSAGE_FACTORY_H diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp index 5f06887a52..f53298ce02 100644 --- a/test/brpc_server_unittest.cpp +++ b/test/brpc_server_unittest.cpp @@ -56,6 +56,7 @@ #include "echo.pb.h" #include "v1.pb.h" #include "v2.pb.h" +#include "v3.pb.h" int main(int argc, char* argv[]) { testing::InitGoogleTest(&argc, argv); @@ -350,6 +351,18 @@ class EchoServiceV2 : public v2::EchoService { butil::atomic ncalled; }; +class EchoServiceV3 : public v3::EchoService { +public: + void Echo(::google::protobuf::RpcController*, + const v3::EchoRequest* request, + v3::EchoResponse* response, + ::google::protobuf::Closure* done) override { + brpc::ClosureGuard done_guard(done); + ASSERT_EQ(request->message(), EXP_REQUEST); + response->set_message(EXP_RESPONSE); + } +}; + TEST_F(ServerTest, empty_enabled_protocols) { butil::EndPoint ep; ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep)); @@ -1815,14 +1828,45 @@ TEST_F(ServerTest, rpc_pb_message_factory) { brpc::ChannelOptions copt; copt.protocol = "baidu_std"; ASSERT_EQ(0, chan.Init(ep, &copt)); - brpc::Controller cntl; - test::EchoRequest req; - test::EchoResponse res; - req.set_message(EXP_REQUEST); - test::EchoService_Stub stub(&chan); - stub.Echo(&cntl, &req, &res, NULL); - ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); - ASSERT_EQ(EXP_RESPONSE, res.message()); + for (int i = 0; i < 1000; ++i) { + brpc::Controller cntl; + test::EchoRequest req; + test::EchoResponse res; + req.set_message(EXP_REQUEST); + test::EchoService_Stub stub(&chan); + stub.Echo(&cntl, &req, &res, NULL); + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); + ASSERT_EQ(EXP_RESPONSE, res.message()); + } + + ASSERT_EQ(0, server.Stop(0)); + ASSERT_EQ(0, server.Join()); +} + +TEST_F(ServerTest, arena_rpc_pb_message_factory) { + butil::EndPoint ep; + ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep)); + brpc::Server server; + EchoServiceV3 service; + ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); + brpc::ServerOptions opt; + opt.rpc_pb_message_factory = brpc::GetArenaRpcPBMessageFactory(); + ASSERT_EQ(0, server.Start(ep, &opt)); + + brpc::Channel chan; + brpc::ChannelOptions copt; + copt.protocol = "baidu_std"; + ASSERT_EQ(0, chan.Init(ep, &copt)); + for (int i = 0; i < 1000; ++i) { + brpc::Controller cntl; + v3::EchoRequest req; + v3::EchoResponse res; + req.set_message(EXP_REQUEST); + v3::EchoService_Stub stub(&chan); + stub.Echo(&cntl, &req, &res, NULL); + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); + ASSERT_EQ(EXP_RESPONSE, res.message()); + } ASSERT_EQ(0, server.Stop(0)); ASSERT_EQ(0, server.Join()); diff --git a/test/v3.proto b/test/v3.proto new file mode 100644 index 0000000000..b920832a0d --- /dev/null +++ b/test/v3.proto @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax="proto3"; + +package v3; + +option cc_generic_services = true; +// https://github.com/protocolbuffers/protobuf/releases/tag/v3.14.0 +// Since Protocol Buffers v3.14.0, Arenas are now unconditionally enabled. +// cc_enable_arenas no longer has any effect. +option cc_enable_arenas = true; + +message EchoRequest { + string message = 1; +}; + +message EchoResponse { + string message = 1; +}; + +service EchoService { + rpc Echo(EchoRequest) returns (EchoResponse); +};