Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mongo client support #2171

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/install-essential-dependences/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ runs:
steps:
- run: ulimit -c unlimited -S && sudo bash -c "echo 'core.%e.%p' > /proc/sys/kernel/core_pattern"
shell: bash
- run: sudo apt-get install -y git g++ make libssl-dev libgflags-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev libgoogle-perftools-dev
- run: sudo apt-get install -y git g++ make libssl-dev libgflags-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev libgoogle-perftools-dev libbson-dev
shell: bash
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*.so
*.dylib
*.rej
*.patch
/output
/test/output
build/
Expand Down
7 changes: 7 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ endif()

find_package(Protobuf REQUIRED)
find_package(Threads REQUIRED)
find_package (bson-1.0 1.7 REQUIRED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是不是做成一个可选的编译选项比较好

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, 整个mongo的client支持都做成一个编译选项吧


find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h)
find_library(LEVELDB_LIB NAMES leveldb)
Expand Down Expand Up @@ -210,11 +211,16 @@ endif()

find_package(OpenSSL)

find_package(bson-1.0 REQUIRED)

get_target_property(BSON_INCLUDE_DIRS mongo::bson_shared INTERFACE_INCLUDE_DIRECTORIES)

include_directories(
${GFLAGS_INCLUDE_PATH}
${PROTOBUF_INCLUDE_DIRS}
${LEVELDB_INCLUDE_PATH}
${OPENSSL_INCLUDE_DIR}
${BSON_INCLUDE_DIRS}
)

set(DYNAMIC_LIB
Expand Down Expand Up @@ -293,6 +299,7 @@ set(BUTIL_SOURCES
${PROJECT_SOURCE_DIR}/src/butil/atomicops_internals_x86_gcc.cc
${PROJECT_SOURCE_DIR}/src/butil/base64.cc
${PROJECT_SOURCE_DIR}/src/butil/big_endian.cc
${PROJECT_SOURCE_DIR}/src/butil/bson_util.cc
${PROJECT_SOURCE_DIR}/src/butil/cpu.cc
${PROJECT_SOURCE_DIR}/src/butil/debug/alias.cc
${PROJECT_SOURCE_DIR}/src/butil/debug/asan_invalid_access.cc
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ BUTIL_SOURCES = \
src/butil/atomicops_internals_x86_gcc.cc \
src/butil/base64.cc \
src/butil/big_endian.cc \
src/butil/bson_util.cc \
src/butil/cpu.cc \
src/butil/debug/alias.cc \
src/butil/debug/asan_invalid_access.cc \
Expand Down
15 changes: 11 additions & 4 deletions config_brpc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ OPENSSL_LIB=$(find_dir_of_lib ssl)
# Inconvenient to check these headers in baidu-internal
#PTHREAD_HDR=$(find_dir_of_header_or_die pthread.h)
OPENSSL_HDR=$(find_dir_of_header_or_die openssl/ssl.h mesalink/openssl/ssl.h)
BSON_HDR=$(find_dir_of_header bson.h)

if [ $WITH_MESALINK != 0 ]; then
MESALINK_HDR=$(find_dir_of_header_or_die mesalink/openssl/ssl.h)
Expand Down Expand Up @@ -199,17 +200,19 @@ if [ "$SYSTEM" = "Darwin" ]; then
DYNAMIC_LINKINGS="$DYNAMIC_LINKINGS -Wl,-U,_RegisterThriftProtocol"
fi
append_linking() {
#convert lib-a.b.c to lib_a_b_c
local lib_name=`echo $2 | sed 's/-\|\./_/g'`
if [ -f $1/lib${2}.a ]; then
if [ "$SYSTEM" = "Darwin" ]; then
# *.a must be explicitly specified in clang
STATIC_LINKINGS="$STATIC_LINKINGS $1/lib${2}.a"
else
STATIC_LINKINGS="$STATIC_LINKINGS -l$2"
fi
export STATICALLY_LINKED_$2=1
export STATICALLY_LINKED_$lib_name=1
else
DYNAMIC_LINKINGS="$DYNAMIC_LINKINGS -l$2"
export STATICALLY_LINKED_$2=0
export STATICALLY_LINKED_$lib_name=0
fi
}

Expand All @@ -219,6 +222,10 @@ append_linking $GFLAGS_LIB gflags
PROTOBUF_LIB=$(find_dir_of_lib_or_die protobuf)
append_linking $PROTOBUF_LIB protobuf

# namespace c, grep it from source.
BSON_LIB=$(find_dir_of_lib_or_die bson-1.0)
append_linking $BSON_LIB bson-1.0

LEVELDB_LIB=$(find_dir_of_lib_or_die leveldb)
# required by leveldb
if [ -f $LEVELDB_LIB/libleveldb.a ]; then
Expand Down Expand Up @@ -263,7 +270,7 @@ fi
PROTOBUF_HDR=$(find_dir_of_header_or_die google/protobuf/message.h)
LEVELDB_HDR=$(find_dir_of_header_or_die leveldb/db.h)

HDRS=$($ECHO "$GFLAGS_HDR\n$PROTOBUF_HDR\n$LEVELDB_HDR\n$OPENSSL_HDR" | sort | uniq)
HDRS=$($ECHO "$GFLAGS_HDR\n$PROTOBUF_HDR\n$LEVELDB_HDR\n$OPENSSL_HDR\n$BSON_HDR" | sort | uniq)
LIBS=$($ECHO "$GFLAGS_LIB\n$PROTOBUF_LIB\n$LEVELDB_LIB\n$OPENSSL_LIB\n$SNAPPY_LIB" | sort | uniq)

absent_in_the_list() {
Expand Down Expand Up @@ -373,7 +380,7 @@ fi
append_to_output "CPPFLAGS=${CPPFLAGS}"
append_to_output "# without the flag, linux+arm64 may crash due to folding on TLS.
ifeq (\$(CC),gcc)
ifeq (\$(shell uname -p),aarch64)
ifeq (\$(shell uname -p),aarch64)
CPPFLAGS+=-fno-gcse
endif
endif
Expand Down
69 changes: 69 additions & 0 deletions example/mongo_c++/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

BRPC_PATH = ../../
include $(BRPC_PATH)/config.mk
CXXFLAGS+=$(CPPFLAGS) -std=c++0x -DNDEBUG -O2 -pipe -W -Wall -fPIC -fno-omit-frame-pointer
HDRS+=$(BRPC_PATH)/output/include
LIBS+=$(BRPC_PATH)/output/lib
HDRPATHS = $(addprefix -I, $(HDRS))
LIBPATHS = $(addprefix -L, $(LIBS))
COMMA=,
SOPATHS=$(addprefix -Wl$(COMMA)-rpath$(COMMA), $(LIBS))

PRESS_SOURCES = mongo_press.cpp

PRESS_OBJS = $(addsuffix .o, $(basename $(PRESS_SOURCES)))

ifeq ($(SYSTEM),Darwin)
ifneq ("$(LINK_SO)", "")
STATIC_LINKINGS += -lbrpc
else
# *.a must be explicitly specified in clang
STATIC_LINKINGS += $(BRPC_PATH)/output/lib/libbrpc.a
endif
LINK_OPTIONS_SO = $^ $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS)
LINK_OPTIONS = $^ $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS)
else ifeq ($(SYSTEM),Linux)
STATIC_LINKINGS += -lbrpc
LINK_OPTIONS_SO = -Xlinker "-(" $^ -Xlinker "-)" $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS)
LINK_OPTIONS = -Xlinker "-(" $^ -Wl,-Bstatic $(STATIC_LINKINGS) -Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS)
endif

.PHONY:all
all: mongo_press

.PHONY:clean
clean:
@echo "> Cleaning"
rm -rf redis_press redis_cli $(PRESS_OBJS) $(CLI_OBJS) $(SERVER_OBJS)

mongo_press:$(PRESS_OBJS)
@echo "> Linking $@"
ifneq ("$(LINK_SO)", "")
$(CXX) $(LIBPATHS) $(SOPATHS) $(LINK_OPTIONS_SO) -o $@
else
$(CXX) $(LIBPATHS) $(LINK_OPTIONS) -o $@
endif

%.o:%.cpp
@echo "> Compiling $@"
$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@

%.o:%.cc
@echo "> Compiling $@"
$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
107 changes: 107 additions & 0 deletions example/mongo_c++/mongo_press.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// A multi-threaded client getting keys from a redis-server constantly.

#include <gflags/gflags.h>
#include <bthread/bthread.h>
#include <butil/logging.h>
#include <butil/string_printf.h>
#include <bvar/bvar.h>
#include <brpc/channel.h>
#include <brpc/server.h>
#include <brpc/mongo.h>

DEFINE_string(connection_type, "single",
"Connection type. Available values: pooled, short");
DEFINE_string(server, "127.0.0.1", "IP Address of server");
DEFINE_int32(port, 27017, "Port of server");
DEFINE_int32(timeout_ms, 5000, "RPC timeout in milliseconds");
DEFINE_int32(connect_timeout_ms, 5000, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_string(collection, "test_collection", "collection name");
DEFINE_string(db, "test_db", "database name");

int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);

// A Channel represents a communication line to a Server. Notice that
// Channel is thread-safe and can be shared by all threads in your program.
brpc::Channel channel;

// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_MONGO;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
options.max_retry = FLAGS_max_retry;
if (channel.Init(FLAGS_server.c_str(), FLAGS_port, &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}

brpc::Controller cntl;
butil::bson::UniqueBsonPtr command(
BCON_NEW("insert", BCON_UTF8(FLAGS_collection.c_str()),
"$db", BCON_UTF8(FLAGS_db.c_str()),
"comment", BCON_UTF8("brpc mongo press")));

brpc::MongoMessage req;
brpc::MongoMessage resp;
req.set_body(std::move(command));
req.set_key("documents");
for (size_t i = 0; i < 10; i++) {
char user_id[64];
char user_name[64];
::snprintf(user_id, sizeof(user_id), "user-%lu", i);
::snprintf(user_name, sizeof(user_name), "user-name-%lu", i);
req.add_doc_sequence(butil::bson::UniqueBsonPtr(BCON_NEW(
"user", BCON_UTF8(user_id),
"_id", BCON_INT32(i),
"user_name", BCON_UTF8(user_name))));
}
LOG(INFO) << "MongoRequest: " << req;
channel.CallMethod(nullptr, &cntl, &req, &resp, nullptr);

if (!cntl.Failed()) {
LOG(INFO) << "OK: \n" << req << "\n" << resp;
} else {
LOG(INFO) << "Failed: \n" << req << "\n" << resp;
LOG(INFO) << cntl.ErrorText();
return 0;
}

while (!brpc::IsAskedToQuit()) {
brpc::Controller cntl;
brpc::MongoMessage req;
brpc::MongoMessage resp;
butil::bson::UniqueBsonPtr command(
BCON_NEW("find", BCON_UTF8(FLAGS_collection.c_str()),
"$db", BCON_UTF8(FLAGS_db.c_str()),
"comment", BCON_UTF8("brpc mongo press query")));
req.set_body(std::move(command));
channel.CallMethod(nullptr, &cntl, &req, &resp, nullptr);
if (!cntl.Failed()) {
LOG(INFO) << "OK: \n" << req << "\n" << resp;
} else {
LOG(INFO) << cntl.ErrorText();
}
bthread_usleep(1000*1000);
}
return 0;
}
8 changes: 5 additions & 3 deletions src/brpc/global.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,10 +509,12 @@ static void GlobalInitializeOrDieImpl() {
}

Protocol mongo_protocol = { ParseMongoMessage,
NULL, NULL,
ProcessMongoRequest, NULL,
SerializeMongoRequest,
PackMongoRequest,
ProcessMongoRequest,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProcessMongoRequest还要保留吗?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得可以不支持server端了,mongo协议变动挺大的。要做前后兼容工作量有点大

ProcessMongoResponse,
NULL, NULL, NULL,
CONNECTION_TYPE_POOLED, "mongo" };
CONNECTION_TYPE_ALL, "mongo" };
if (RegisterProtocol(PROTOCOL_MONGO, mongo_protocol) != 0) {
exit(1);
}
Expand Down
Loading