Skip to content

Commit

Permalink
Support arena rpc pb message factory
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright committed Sep 1, 2024
1 parent 48996cf commit 4fad729
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 12 deletions.
13 changes: 12 additions & 1 deletion docs/cn/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -1027,20 +1027,31 @@ Server默认使用`DefaultRpcPBMessageFactory`。它是一个简单的工厂类
class RpcPBMessages {
public:
virtual ~RpcPBMessages() = default;
// Get protobuf request message.
virtual google::protobuf::Message* Request() = 0;
// Get protobuf response message.
virtual google::protobuf::Message* Response() = 0;
};
// Factory to manage `RpcPBMessages'.
class RpcPBMessageFactory {
public:
virtual ~RpcPBMessageFactory() = default;
// Get `RpcPBMessages' according to `service' and `method'.
// Common practice to create protobuf message:
// service.GetRequestPrototype(&method).New() -> request;
// service.GetRequestPrototype(&method).New() -> response.
virtual RpcPBMessages* Get(const ::google::protobuf::Service& service,
const ::google::protobuf::MethodDescriptor& method) = 0;
virtual void Return(RpcPBMessages* protobuf_message) = 0;
// Return `RpcPBMessages' to factory.
virtual void Return(RpcPBMessages* messages) = 0;
};
```

### Protobuf arena


# FAQ

### Q: Fail to write into fd=1865 [email protected]:54742@8230: Got EOF是什么意思
Expand Down
35 changes: 35 additions & 0 deletions docs/en/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,41 @@ public:
...
```
## RPC Protobuf message factory
`DefaultRpcPBMessageFactory' is used at server-side by default. It is a simple factory class that uses `new' to create request/response messages and `delete' to destroy request/response messages.
Users can implement `RpcPBMessages' (encapsulation of request/response message) and `RpcPBMessageFactory' (factory class) to customize the creation and destruction mechanism of protobuf message, and then set to `ServerOptions.rpc_pb_message_factory'.
The interface is as follows:
```c++
// Inherit this class to customize rpc protobuf messages,
// include request and response.
class RpcPBMessages {
public:
virtual ~RpcPBMessages() = default;
// Get protobuf request message.
virtual google::protobuf::Message* Request() = 0;
// Get protobuf response message.
virtual google::protobuf::Message* Response() = 0;
};
// Factory to manage `RpcPBMessages'.
class RpcPBMessageFactory {
public:
virtual ~RpcPBMessageFactory() = default;
// Get `RpcPBMessages' according to `service' and `method'.
// Common practice to create protobuf message:
// service.GetRequestPrototype(&method).New() -> request;
// service.GetRequestPrototype(&method).New() -> response.
virtual RpcPBMessages* Get(const ::google::protobuf::Service& service,
const ::google::protobuf::MethodDescriptor& method) = 0;
// Return `RpcPBMessages' to factory.
virtual void Return(RpcPBMessages* protobuf_message) = 0;
};
```

# FAQ

### Q: Fail to write into fd=1865 [email protected]:54742@8230: Got EOF
Expand Down
1 change: 0 additions & 1 deletion src/brpc/rpc_pb_message_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

#include "brpc/rpc_pb_message_factory.h"
#include "butil/object_pool.h"

namespace brpc {

Expand Down
87 changes: 85 additions & 2 deletions src/brpc/rpc_pb_message_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <google/protobuf/service.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/message.h>
#include <google/protobuf/arena.h>
#include "butil/object_pool.h"

namespace brpc {

Expand All @@ -29,17 +31,24 @@ namespace brpc {
class RpcPBMessages {
public:
virtual ~RpcPBMessages() = default;
// Get protobuf request message.
virtual google::protobuf::Message* Request() = 0;
// Get protobuf response message.
virtual google::protobuf::Message* Response() = 0;
};

// Factory to manage `RpcPBMessages'.
class RpcPBMessageFactory {
public:
virtual ~RpcPBMessageFactory() = default;
// Get `RpcPBMessages' according to `service' and `method'.
// Common practice to create protobuf message:
// service.GetRequestPrototype(&method).New() -> request;
// service.GetRequestPrototype(&method).New() -> response.
virtual RpcPBMessages* Get(const ::google::protobuf::Service& service,
const ::google::protobuf::MethodDescriptor& method) = 0;
virtual void Return(RpcPBMessages* protobuf_message) = 0;
// Return `RpcPBMessages' to factory.
virtual void Return(RpcPBMessages* messages) = 0;
};

class DefaultRpcPBMessageFactory : public RpcPBMessageFactory {
Expand All @@ -49,6 +58,80 @@ class DefaultRpcPBMessageFactory : public RpcPBMessageFactory {
void Return(RpcPBMessages* messages) override;
};

namespace internal {

// Allocate protobuf message from arena.
// The arena is created with `StartBlockSize' and `MaxBlockSize' options.
// For more details, see `google::protobuf::ArenaOptions'.
template<size_t StartBlockSize, size_t MaxBlockSize>
struct ArenaRpcPBMessages : public RpcPBMessages {
struct ArenaOptionsWrapper {
public:
ArenaOptionsWrapper() {
options.start_block_size = StartBlockSize;
options.max_block_size = MaxBlockSize;
}

private:
friend struct ArenaRpcPBMessages;
::google::protobuf::ArenaOptions options;
};

explicit ArenaRpcPBMessages(ArenaOptionsWrapper options_wrapper)
: arena(options_wrapper.options)
, request(NULL)
, response(NULL) {}

::google::protobuf::Message* Request() override { return request; }
::google::protobuf::Message* Response() override { return response; }

::google::protobuf::Arena arena;
::google::protobuf::Message* request;
::google::protobuf::Message* response;
};

template<size_t StartBlockSize, size_t MaxBlockSize>
class ArenaRpcPBMessageFactory : public RpcPBMessageFactory {
typedef ::brpc::internal::ArenaRpcPBMessages<StartBlockSize, MaxBlockSize>
ArenaRpcPBMessages;
public:
ArenaRpcPBMessageFactory() {
_arena_options.start_block_size = StartBlockSize;
_arena_options.max_block_size = MaxBlockSize;
}

RpcPBMessages* Get(const ::google::protobuf::Service& service,
const ::google::protobuf::MethodDescriptor& method) override {
typename ArenaRpcPBMessages::ArenaOptionsWrapper options_wrapper;
auto messages = butil::get_object<ArenaRpcPBMessages>(options_wrapper);
messages->request = service.GetRequestPrototype(&method).New(&messages->arena);
messages->response = service.GetResponsePrototype(&method).New(&messages->arena);
return messages;
}

void Return(RpcPBMessages* messages) override {
auto arena_messages = static_cast<ArenaRpcPBMessages*>(messages);
arena_messages->request = NULL;
arena_messages->response = NULL;
butil::return_object(arena_messages);
}

private:
::google::protobuf::ArenaOptions _arena_options;
};

}

template<size_t StartBlockSize, size_t MaxBlockSize>
RpcPBMessageFactory* GetArenaRpcPBMessageFactory() {
return new ::brpc::internal::ArenaRpcPBMessageFactory<StartBlockSize, MaxBlockSize>();
}

BUTIL_FORCE_INLINE RpcPBMessageFactory* GetArenaRpcPBMessageFactory() {
// Default arena options, same as `google::protobuf::ArenaOptions'.
return GetArenaRpcPBMessageFactory<256, 8192>();
}

} // namespace brpc

#endif //BRPC_RPC_PB_MESSAGE_FACTORY_H
#endif // BRPC_RPC_PB_MESSAGE_FACTORY_H
60 changes: 52 additions & 8 deletions test/brpc_server_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "echo.pb.h"
#include "v1.pb.h"
#include "v2.pb.h"
#include "v3.pb.h"

int main(int argc, char* argv[]) {
testing::InitGoogleTest(&argc, argv);
Expand Down Expand Up @@ -350,6 +351,18 @@ class EchoServiceV2 : public v2::EchoService {
butil::atomic<int> ncalled;
};

class EchoServiceV3 : public v3::EchoService {
public:
void Echo(::google::protobuf::RpcController*,
const v3::EchoRequest* request,
v3::EchoResponse* response,
::google::protobuf::Closure* done) override {
brpc::ClosureGuard done_guard(done);
ASSERT_EQ(request->message(), EXP_REQUEST);
response->set_message(EXP_RESPONSE);
}
};

TEST_F(ServerTest, empty_enabled_protocols) {
butil::EndPoint ep;
ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep));
Expand Down Expand Up @@ -1815,14 +1828,45 @@ TEST_F(ServerTest, rpc_pb_message_factory) {
brpc::ChannelOptions copt;
copt.protocol = "baidu_std";
ASSERT_EQ(0, chan.Init(ep, &copt));
brpc::Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(EXP_REQUEST);
test::EchoService_Stub stub(&chan);
stub.Echo(&cntl, &req, &res, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(EXP_RESPONSE, res.message());
for (int i = 0; i < 1000; ++i) {
brpc::Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(EXP_REQUEST);
test::EchoService_Stub stub(&chan);
stub.Echo(&cntl, &req, &res, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(EXP_RESPONSE, res.message());
}

ASSERT_EQ(0, server.Stop(0));
ASSERT_EQ(0, server.Join());
}

TEST_F(ServerTest, arena_rpc_pb_message_factory) {
butil::EndPoint ep;
ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep));
brpc::Server server;
EchoServiceV3 service;
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
brpc::ServerOptions opt;
opt.rpc_pb_message_factory = brpc::GetArenaRpcPBMessageFactory();
ASSERT_EQ(0, server.Start(ep, &opt));

brpc::Channel chan;
brpc::ChannelOptions copt;
copt.protocol = "baidu_std";
ASSERT_EQ(0, chan.Init(ep, &copt));
for (int i = 0; i < 1000; ++i) {
brpc::Controller cntl;
v3::EchoRequest req;
v3::EchoResponse res;
req.set_message(EXP_REQUEST);
v3::EchoService_Stub stub(&chan);
stub.Echo(&cntl, &req, &res, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(EXP_RESPONSE, res.message());
}

ASSERT_EQ(0, server.Stop(0));
ASSERT_EQ(0, server.Join());
Expand Down
38 changes: 38 additions & 0 deletions test/v3.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

syntax="proto3";

package v3;

option cc_generic_services = true;
// https://github.com/protocolbuffers/protobuf/releases/tag/v3.14.0
// Since Protocol Buffers v3.14.0, Arenas are now unconditionally enabled.
// cc_enable_arenas no longer has any effect.
option cc_enable_arenas = true;

message EchoRequest {
string message = 1;
};

message EchoResponse {
string message = 1;
};

service EchoService {
rpc Echo(EchoRequest) returns (EchoResponse);
};

0 comments on commit 4fad729

Please sign in to comment.