From e5a235503c4f5c9e4e96ba2116e22dbd271dd0a5 Mon Sep 17 00:00:00 2001 From: "helei.sig11" Date: Mon, 20 Mar 2023 20:50:00 +0800 Subject: [PATCH] brpc: support mongo client Signed-off-by: helei.sig11 --- example/mongo_c++/Makefile | 69 +++++++ example/mongo_c++/mongo_press.cpp | 107 ++++++++++ src/brpc/global.cpp | 8 +- src/brpc/mongo.cpp | 316 +++++++++++++++++++++++++++++ src/brpc/mongo.h | 145 +++++++++++++ src/brpc/mongo_head.h | 16 +- src/brpc/policy/mongo.proto | 36 +--- src/brpc/policy/mongo_protocol.cpp | 230 ++++++++++++++++----- src/brpc/policy/mongo_protocol.h | 23 +++ src/brpc/proto_base.proto | 2 + src/butil/bson_util.cc | 2 +- 11 files changed, 869 insertions(+), 85 deletions(-) create mode 100644 example/mongo_c++/Makefile create mode 100644 example/mongo_c++/mongo_press.cpp create mode 100644 src/brpc/mongo.cpp create mode 100644 src/brpc/mongo.h diff --git a/example/mongo_c++/Makefile b/example/mongo_c++/Makefile new file mode 100644 index 0000000000..42ab5cdc3b --- /dev/null +++ b/example/mongo_c++/Makefile @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +BRPC_PATH = ../../ +include $(BRPC_PATH)/config.mk +CXXFLAGS+=$(CPPFLAGS) -std=c++0x -DNDEBUG -O2 -pipe -W -Wall -fPIC -fno-omit-frame-pointer +HDRS+=$(BRPC_PATH)/output/include +LIBS+=$(BRPC_PATH)/output/lib +HDRPATHS = $(addprefix -I, $(HDRS)) +LIBPATHS = $(addprefix -L, $(LIBS)) +COMMA=, +SOPATHS=$(addprefix -Wl$(COMMA)-rpath$(COMMA), $(LIBS)) + +PRESS_SOURCES = mongo_press.cpp + +PRESS_OBJS = $(addsuffix .o, $(basename $(PRESS_SOURCES))) + +ifeq ($(SYSTEM),Darwin) + ifneq ("$(LINK_SO)", "") + STATIC_LINKINGS += -lbrpc + else + # *.a must be explicitly specified in clang + STATIC_LINKINGS += $(BRPC_PATH)/output/lib/libbrpc.a + endif + LINK_OPTIONS_SO = $^ $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS) + LINK_OPTIONS = $^ $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS) +else ifeq ($(SYSTEM),Linux) + STATIC_LINKINGS += -lbrpc + LINK_OPTIONS_SO = -Xlinker "-(" $^ -Xlinker "-)" $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS) + LINK_OPTIONS = -Xlinker "-(" $^ -Wl,-Bstatic $(STATIC_LINKINGS) -Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS) +endif + +.PHONY:all +all: mongo_press + +.PHONY:clean +clean: + @echo "> Cleaning" + rm -rf redis_press redis_cli $(PRESS_OBJS) $(CLI_OBJS) $(SERVER_OBJS) + +mongo_press:$(PRESS_OBJS) + @echo "> Linking $@" +ifneq ("$(LINK_SO)", "") + $(CXX) $(LIBPATHS) $(SOPATHS) $(LINK_OPTIONS_SO) -o $@ +else + $(CXX) $(LIBPATHS) $(LINK_OPTIONS) -o $@ +endif + +%.o:%.cpp + @echo "> Compiling $@" + $(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@ + +%.o:%.cc + @echo "> Compiling $@" + $(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@ diff --git a/example/mongo_c++/mongo_press.cpp b/example/mongo_c++/mongo_press.cpp new file mode 100644 index 0000000000..1855e5d92b --- /dev/null +++ b/example/mongo_c++/mongo_press.cpp @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A multi-threaded client getting keys from a redis-server constantly. + +#include +#include +#include +#include +#include +#include +#include +#include + +DEFINE_string(connection_type, "single", + "Connection type. Available values: pooled, short"); +DEFINE_string(server, "127.0.0.1", "IP Address of server"); +DEFINE_int32(port, 27017, "Port of server"); +DEFINE_int32(timeout_ms, 5000, "RPC timeout in milliseconds"); +DEFINE_int32(connect_timeout_ms, 5000, "RPC timeout in milliseconds"); +DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); +DEFINE_string(collection, "test_collection", "collection name"); +DEFINE_string(db, "test_db", "database name"); + +int main(int argc, char* argv[]) { + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + // A Channel represents a communication line to a Server. Notice that + // Channel is thread-safe and can be shared by all threads in your program. + brpc::Channel channel; + + // Initialize the channel, NULL means using default options. + brpc::ChannelOptions options; + options.protocol = brpc::PROTOCOL_MONGO; + options.connection_type = FLAGS_connection_type; + options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/; + options.max_retry = FLAGS_max_retry; + if (channel.Init(FLAGS_server.c_str(), FLAGS_port, &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + return -1; + } + + brpc::Controller cntl; + butil::bson::UniqueBsonPtr command( + BCON_NEW("insert", BCON_UTF8(FLAGS_collection.c_str()), + "$db", BCON_UTF8(FLAGS_db.c_str()), + "comment", BCON_UTF8("brpc mongo press"))); + + brpc::MongoMessage req; + brpc::MongoMessage resp; + req.set_body(std::move(command)); + req.set_key("documents"); + for (size_t i = 0; i < 10; i++) { + char user_id[64]; + char user_name[64]; + ::snprintf(user_id, sizeof(user_id), "user-%lu", i); + ::snprintf(user_name, sizeof(user_name), "user-name-%lu", i); + req.add_doc_sequence(butil::bson::UniqueBsonPtr(BCON_NEW( + "user", BCON_UTF8(user_id), + "_id", BCON_INT32(i), + "user_name", BCON_UTF8(user_name)))); + } + LOG(INFO) << "MongoRequest: " << req; + channel.CallMethod(nullptr, &cntl, &req, &resp, nullptr); + + if (!cntl.Failed()) { + LOG(INFO) << "OK: \n" << req << "\n" << resp; + } else { + LOG(INFO) << "Failed: \n" << req << "\n" << resp; + LOG(INFO) << cntl.ErrorText(); + return 0; + } + + while (!brpc::IsAskedToQuit()) { + brpc::Controller cntl; + brpc::MongoMessage req; + brpc::MongoMessage resp; + butil::bson::UniqueBsonPtr command( + BCON_NEW("find", BCON_UTF8(FLAGS_collection.c_str()), + "$db", BCON_UTF8(FLAGS_db.c_str()), + "comment", BCON_UTF8("brpc mongo press query"))); + req.set_body(std::move(command)); + channel.CallMethod(nullptr, &cntl, &req, &resp, nullptr); + if (!cntl.Failed()) { + LOG(INFO) << "OK: \n" << req << "\n" << resp; + } else { + LOG(INFO) << cntl.ErrorText(); + } + bthread_usleep(1000*1000); + } + return 0; +} diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp index 30c2f1a3b9..6f507f4a25 100644 --- a/src/brpc/global.cpp +++ b/src/brpc/global.cpp @@ -509,10 +509,12 @@ static void GlobalInitializeOrDieImpl() { } Protocol mongo_protocol = { ParseMongoMessage, - NULL, NULL, - ProcessMongoRequest, NULL, + SerializeMongoRequest, + PackMongoRequest, + ProcessMongoRequest, + ProcessMongoResponse, NULL, NULL, NULL, - CONNECTION_TYPE_POOLED, "mongo" }; + CONNECTION_TYPE_ALL, "mongo" }; if (RegisterProtocol(PROTOCOL_MONGO, mongo_protocol) != 0) { exit(1); } diff --git a/src/brpc/mongo.cpp b/src/brpc/mongo.cpp new file mode 100644 index 0000000000..dbe34abf56 --- /dev/null +++ b/src/brpc/mongo.cpp @@ -0,0 +1,316 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/mongo.h" + +#include // ReflectionOps::Merge + +#include "brpc/proto_base.pb.h" +#include "butil/logging.h" +#include "mongo_head.h" + +namespace brpc { + +bool MongoMessage::SerializeToIOBuf(butil::IOBuf *buf) const { + // flag bits + uint32_t flag_bits = butil::ByteSwapToLE32(_flag_bits); + buf->append(&flag_bits, sizeof(flag_bits)); + + // body section + uint8_t payload_type = MONGO_PAYLOAD_TYPE_BODY; + buf->append(&payload_type, sizeof(payload_type)); + buf->append(bson_get_data(_body.get()), _body->len); + + // data section + if (_key.empty() || _doc_sequence.empty()) { + return true; + } + if (bson_has_field(_body.get(), _key.c_str())) { + return false; + } + + payload_type = MONGO_PAYLOAD_TYPE_DOC_SEQUENCE; + uint8_t section_length = 4 + _key.size() + 1; + for (const UniqueBsonPtr& doc : _doc_sequence) { + section_length += doc->len; + } + section_length = butil::ByteSwapToLE32(section_length); + buf->append(&payload_type, sizeof(payload_type)); + buf->append(§ion_length, sizeof(section_length)); + buf->append(_key.c_str(), _key.size() + 1); + for (const UniqueBsonPtr& doc : _doc_sequence) { + buf->append(bson_get_data(doc.get()), doc->len); + } + return true; +} + +bool MongoMessage::ParseFromIOBuf(butil::IOBuf* payload) { + bool success = false; + do { + // TODO: support checksum + payload->cutn(&_flag_bits, sizeof(_flag_bits)); + _flag_bits = butil::ByteSwapToLE32(_flag_bits); + if (_flag_bits != 0) { + LOG(WARNING) << "current only zero flag is supported"; + break; + } + + // parse first section + uint8_t payload_type; + payload->cutn(&payload_type, sizeof(payload_type)); + if (payload_type != MONGO_PAYLOAD_TYPE_BODY) { + break; + } + + _body = butil::bson::ExtractBsonFromIOBuf(*payload); + if (!_body) { + break; + } + payload->pop_front(_body->len); + + if (payload->size() == 0) { + success = true; + break; + } + + // parse second section + payload->cutn(&payload_type, sizeof(payload_type)); + if (payload_type != MONGO_PAYLOAD_TYPE_DOC_SEQUENCE) { + break; + } + + uint32_t section_length = 0; + payload->cutn(§ion_length, sizeof(section_length)); + section_length = butil::ByteSwapToLE32(section_length); + if (section_length != sizeof(section_length) + payload->size()) { + break; + } + + char c = '\0'; + while (payload->cut1(&c) && c != '\0') { + _key.push_back(c); + } + if (c == '\0') { + _key.push_back(c); + } else { + break; + } + + butil::bson::BsonEnumerator enumerator(payload); + while (const bson_t* doc = enumerator.Next()) { + _doc_sequence.emplace_back(bson_copy(doc)); + }; + if (!enumerator.HasError()) { + success = true; + } + } while (false); + + return success; +} + +void MongoMessage::Print(std::ostream& os) const { + os << "{"; + do { + if (!_body) { + break; + } + + if (_head.message_length != 0) { + os << " \"message_length\" : " << _head.message_length << ","; + os << " \"request_id\" : " << _head.request_id << ","; + os << " \"response_to\" : " << _head.response_to << ","; + os << " \"op_code\" : " << _head.op_code << ","; + } + + os << " \"flag_bits\" : " << _flag_bits << ","; + os << " \"body\" : "; + char *json = bson_as_json(_body.get(), nullptr); + os << json; + bson_free(json); + + if (_key.empty() || _doc_sequence.empty()) { + break; + } + os << ", \"" << _key << "\" : ["; + size_t i = 0; + for (const UniqueBsonPtr& doc : _doc_sequence) { + json = bson_as_json(doc.get(), nullptr); + os << json; + ++i; + if (i != _doc_sequence.size()) { + os << ", "; + } + bson_free(json); + } + os << "]"; + } while (false); + os << "}"; +} + +MongoMessage::MongoMessage() + : ::google::protobuf::Message() { + SharedCtor(); +} + +MongoMessage::MongoMessage(const MongoMessage& from) + : ::google::protobuf::Message() { + SharedCtor(); + MergeFrom(from); +} + +MongoMessage::~MongoMessage() { + SharedDtor(); +} + +void MongoMessage::SharedCtor() { + _cached_size = 0; + memset(&_head, 0, sizeof(_head)); + _flag_bits = 0; +} + +void MongoMessage::SharedDtor() { +} + +void MongoMessage::SetCachedSize(int size) const { + _cached_size = size; +} + +MongoMessage* MongoMessage::New() const { + return new MongoMessage; +} + +#if GOOGLE_PROTOBUF_VERSION >= 3006000 +MongoMessage* MongoMessage::New(::google::protobuf::Arena* arena) const { + return CreateMaybeMessage(arena); +} +#endif + +void MongoMessage::Clear() { + _cached_size = 0; + memset(&_head, 0, sizeof(_head)); + _flag_bits = 0; + _body.reset(); + _key.clear(); + _doc_sequence.clear(); +} + +bool MongoMessage::MergePartialFromCodedStream( + ::google::protobuf::io::CodedInputStream*) { + LOG(WARNING) << "You're not supposed to parse a MongoMessage"; + return true; +} + +void MongoMessage::SerializeWithCachedSizes( + ::google::protobuf::io::CodedOutputStream*) const { + LOG(WARNING) << "You're not supposed to serialize a MongoMessage"; +} + +::google::protobuf::uint8* MongoMessage::SerializeWithCachedSizesToArray( + ::google::protobuf::uint8* target) const { + LOG(WARNING) << "You're not supposed to serialize a MongoMessage"; + return target; +} + +int MongoMessage::ByteSize() const { + int total_size = 0; + do { + if (!_body) { + break; + } + // header + flag_bits(4) + section_kind(1) + body + total_size += sizeof(_head) + 5 + _body->len; + // section_kind(1) + section_length(4) + doc_sequence + if (!_key.empty() && !_doc_sequence.empty()) { + total_size += 5; + for (const UniqueBsonPtr& doc : _doc_sequence) { + total_size += doc->len; + } + } + } while (false); + _cached_size = total_size; + return total_size; +} + +void MongoMessage::MergeFrom(const ::google::protobuf::Message& from) { + GOOGLE_CHECK_NE(&from, this); + const MongoMessage* source = dynamic_cast(&from); + if (source == NULL) { + ::google::protobuf::internal::ReflectionOps::Merge(from, this); + } else { + MergeFrom(*source); + } +} + +void MongoMessage::MergeFrom(const MongoMessage& from) { + GOOGLE_CHECK_NE(&from, this); + if (_body || !_doc_sequence.empty()) { + LOG(WARNING) << "You're not supposed to merge a non-empty MongoMessage"; + return; + } + _head = from._head; + _flag_bits = from._flag_bits; + _body.reset(bson_copy(from._body.get())); + _key = from._key; + for (const UniqueBsonPtr& doc : from._doc_sequence) { + _doc_sequence.emplace_back(bson_copy(doc.get())); + } + _cached_size = from._cached_size; +} + +void MongoMessage::CopyFrom(const ::google::protobuf::Message& from) { + if (&from == this) return; + Clear(); + MergeFrom(from); +} + +void MongoMessage::CopyFrom(const MongoMessage& from) { + if (&from == this) return; + Clear(); + MergeFrom(from); +} + +bool MongoMessage::IsInitialized() const { + return _body != nullptr; +} + +void MongoMessage::Swap(MongoMessage* other) { + if (other != this) { + std::swap(_head, other->_head); + std::swap(_flag_bits, other->_flag_bits); + std::swap(_body, other->_body); + std::swap(_key, other->_key); + std::swap(_doc_sequence, other->_doc_sequence); + } +} + +const ::google::protobuf::Descriptor* MongoMessage::descriptor() { + return MongoMessageBase::descriptor(); +} + +::google::protobuf::Metadata MongoMessage::GetMetadata() const { + ::google::protobuf::Metadata metadata; + metadata.descriptor = MongoMessage::descriptor(); + metadata.reflection = NULL; + return metadata; +} + +std::ostream& operator<<(std::ostream& os, const MongoMessage& msg) { + msg.Print(os); + return os; +} + +} // namespace brpc diff --git a/src/brpc/mongo.h b/src/brpc/mongo.h new file mode 100644 index 0000000000..63921a6413 --- /dev/null +++ b/src/brpc/mongo.h @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_MONGO_H +#define BRPC_MONGO_H + +#include + +#include "brpc/mongo_head.h" +#include "brpc/pb_compat.h" +#include "brpc/parse_result.h" +#include "butil/iobuf.h" +#include "butil/bson_util.h" + +namespace brpc { + + +class MongoMessage : public ::google::protobuf::Message { +public: + MongoMessage(); + virtual ~MongoMessage(); + MongoMessage(const MongoMessage& from); + MongoMessage& operator=(const MongoMessage& from) { + CopyFrom(from); + return *this; + }; + void Swap(MongoMessage* other); + + mongo_head_t& head() { + return _head; + } + + const mongo_head_t& head() const { + return _head; + } + + const uint32_t& flag_bits() const { + return _flag_bits; + } + + void set_flag_bits(int32_t flag_bits) { + _flag_bits = flag_bits; + } + + const std::string& key() const { + return _key; + } + + void set_key(std::string key) { + _key = std::move(key); + } + + const bson_t* body() const { + return _body.get(); + } + + void set_body(bson_t* body) { + _body.reset(bson_copy(body)); + } + + void set_body(butil::bson::UniqueBsonPtr&& body) { + _body.reset(body.release()); + } + + bson_t* doc_sequence(size_t i) { + return _doc_sequence[i].get(); + } + + size_t doc_sequence_size() { + return _doc_sequence.size(); + } + + void add_doc_sequence() { + _doc_sequence.emplace_back(bson_new()); + } + + void add_doc_sequence(butil::bson::UniqueBsonPtr&& doc) { + _doc_sequence.emplace_back(std::move(doc)); + } + + // Serialize/Parse |Payload| from IOBuf + bool ParseFromIOBuf(butil::IOBuf* payload); + bool SerializeToIOBuf(butil::IOBuf* buf) const; + + // Protobuf methods. + MongoMessage* New() const PB_319_OVERRIDE; +#if GOOGLE_PROTOBUF_VERSION >= 3006000 + MongoMessage* New(::google::protobuf::Arena* arena) const override; +#endif + void CopyFrom(const ::google::protobuf::Message& from) PB_321_OVERRIDE; + void MergeFrom(const ::google::protobuf::Message& from) override; + void CopyFrom(const MongoMessage& from); + void MergeFrom(const MongoMessage& from); + void Clear() override; + bool IsInitialized() const override; + + int ByteSize() const; + bool MergePartialFromCodedStream( + ::google::protobuf::io::CodedInputStream* input) PB_310_OVERRIDE; + void SerializeWithCachedSizes( + ::google::protobuf::io::CodedOutputStream* output) const PB_310_OVERRIDE; + ::google::protobuf::uint8* SerializeWithCachedSizesToArray(::google::protobuf::uint8* output) const PB_310_OVERRIDE; + int GetCachedSize() const override { return _cached_size; } + + static const ::google::protobuf::Descriptor* descriptor(); + + void Print(std::ostream&) const; + +protected: + ::google::protobuf::Metadata GetMetadata() const override; + +private: + using UniqueBsonPtr = butil::bson::UniqueBsonPtr; + + void SharedCtor(); + void SharedDtor(); + virtual void SetCachedSize(int size) const override; + + mongo_head_t _head; + uint32_t _flag_bits; + UniqueBsonPtr _body; + std::string _key; + std::vector _doc_sequence; + + mutable int _cached_size; // ByteSize +}; + +std::ostream& operator<<(std::ostream& os, const MongoMessage&); + +} // namespace brpc +#endif // BRPC_MONGO_H diff --git a/src/brpc/mongo_head.h b/src/brpc/mongo_head.h index b9da0171f2..732b94bec5 100644 --- a/src/brpc/mongo_head.h +++ b/src/brpc/mongo_head.h @@ -28,19 +28,33 @@ namespace brpc { // https://docs.mongodb.org/manual/reference/mongodb-wire-protocol/#request-opcodes enum MongoOpCode { MONGO_OPCODE_REPLY = 1, - MONGO_OPCODE_MSG = 1000, MONGO_OPCODE_UPDATE = 2001, MONGO_OPCODE_INSERT = 2002, MONGO_OPCODE_QUERY = 2004, MONGO_OPCODE_GET_MORE = 2005, MONGO_OPCODE_DELETE = 2006, MONGO_OPCODE_KILL_CURSORS = 2007, + MONGO_OPCODE_COMPRESSED = 2012, + MONGO_OPCODE_MSG = 2013, +}; + +enum MongoMsgFlag { + MONGO_MSG_FLAG_CHECKSUM_PRESENT = 1 << 0, + MONGO_MSG_FLAG_MORE_TO_COME = 1 << 2, + MONGO_MSG_FLAG_EXHAUST_ALLOWED = 1 << 3, +}; + +enum MongoPayloadType { + MONGO_PAYLOAD_TYPE_BODY = 0, + MONGO_PAYLOAD_TYPE_DOC_SEQUENCE = 1, + MONGO_PAYLOAD_TYPE_INTERNAL = 2, }; inline bool is_mongo_opcode(int32_t op_code) { switch (op_code) { case MONGO_OPCODE_REPLY: return true; case MONGO_OPCODE_MSG: return true; + case MONGO_OPCODE_COMPRESSED: return true; case MONGO_OPCODE_UPDATE: return true; case MONGO_OPCODE_INSERT: return true; case MONGO_OPCODE_QUERY: return true; diff --git a/src/brpc/policy/mongo.proto b/src/brpc/policy/mongo.proto index 87b839b3ad..1646935757 100644 --- a/src/brpc/policy/mongo.proto +++ b/src/brpc/policy/mongo.proto @@ -24,40 +24,8 @@ option java_generic_services = true; option java_package="com.brpc.policy"; option java_outer_classname="MongoProto"; -enum MongoOp { - OPREPLY = 1; - DBMSG = 1000; - DB_UPDATE = 2001; - DB_INSERT = 2002; - DB_QUERY = 2004; - DB_GETMORE = 2005; - DB_DELETE = 2006; - DB_KILLCURSORS = 2007; - DB_COMMAND = 2008; - DB_COMMANDREPLY = 2009; -} - -message MongoHeader { - required int32 message_length = 1; - required int32 request_id = 2; - required int32 response_to = 3; - required MongoOp op_code = 4; -} - -message MongoRequest { - required MongoHeader header = 1; - required string message = 2; -} - -message MongoResponse { - required MongoHeader header = 1; - required int32 response_flags = 2; - required int64 cursor_id = 3; - required int32 starting_from = 4; - required int32 number_returned = 5; - required string message = 6; -} +import "brpc/proto_base.proto"; service MongoService { - rpc default_method(MongoRequest) returns (MongoResponse); + rpc default_method(brpc.MongoMessageBase) returns (brpc.MongoMessageBase); } diff --git a/src/brpc/policy/mongo_protocol.cpp b/src/brpc/policy/mongo_protocol.cpp index 82bb3e0b36..174b9d3e9c 100644 --- a/src/brpc/policy/mongo_protocol.cpp +++ b/src/brpc/policy/mongo_protocol.cpp @@ -24,12 +24,13 @@ #include "brpc/socket.h" // Socket #include "brpc/server.h" // Server #include "brpc/span.h" +#include "brpc/mongo.h" #include "brpc/mongo_head.h" #include "brpc/details/server_private_accessor.h" #include "brpc/details/controller_private_accessor.h" #include "brpc/mongo_service_adaptor.h" #include "brpc/policy/most_common_message.h" -#include "brpc/policy/nshead_protocol.h" +#include "brpc/policy/mongo_protocol.h" #include "brpc/policy/mongo.pb.h" #include "brpc/details/usercode_backup_pool.h" @@ -41,6 +42,44 @@ void bthread_assign_data(void* data); namespace brpc { namespace policy { +class MongoStreamData : public StreamUserData { +public: + // @StreamUserData + virtual void DestroyStreamUserData(SocketUniquePtr& sending_sock, + Controller* cntl, + int error_code, + bool end_of_rpc) override; + + void set_request_id(uint32_t request_id) { + _request_id = request_id; + } + + void set_correlation_id(uint64_t correlation_id) { + _correlation_id = correlation_id; + } + + uint32_t request_id() const { + return _request_id; + } + uint64_t correlation_id() const { + return _correlation_id; + } + +private: + uint64_t _correlation_id; + uint32_t _request_id; +}; + +class MongoStreamCreator : public StreamCreator { +public: + // @StreamCreator + virtual StreamUserData* OnCreatingStream(SocketUniquePtr* inout, + Controller* cntl) override; + // @StreamCreator + virtual void DestroyStreamCreator(Controller* cntl) override; +}; + + struct SendMongoResponse : public google::protobuf::Closure { SendMongoResponse(const Server *server) : status(NULL), @@ -53,8 +92,8 @@ struct SendMongoResponse : public google::protobuf::Closure { int64_t received_us; const Server *server; Controller cntl; - MongoRequest req; - MongoResponse res; + MongoMessage req; + MongoMessage res; }; SendMongoResponse::~SendMongoResponse() { @@ -75,26 +114,15 @@ void SendMongoResponse::Run() { server->options().mongo_service_adaptor; butil::IOBuf res_buf; if (cntl.Failed()) { - adaptor->SerializeError(res.header().response_to(), &res_buf); - } else if (res.has_message()) { - mongo_head_t header = { - res.header().message_length(), - res.header().request_id(), - res.header().response_to(), - res.header().op_code() - }; - res_buf.append(static_cast(&header), sizeof(mongo_head_t)); - int32_t response_flags = res.response_flags(); - int64_t cursor_id = res.cursor_id(); - int32_t starting_from = res.starting_from(); - int32_t number_returned = res.number_returned(); - res_buf.append(&response_flags, sizeof(response_flags)); - res_buf.append(&cursor_id, sizeof(cursor_id)); - res_buf.append(&starting_from, sizeof(starting_from)); - res_buf.append(&number_returned, sizeof(number_returned)); - res_buf.append(res.message()); + adaptor->SerializeError(res.head().response_to, &res_buf); + } else if (res.IsInitialized()) { + mongo_head_t head = res.head(); + head.make_host_endian(); + res_buf.append(&head, sizeof(head)); + res.SerializeToIOBuf(&res_buf); } + // TODO: handle compress if (!res_buf.empty()) { // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. @@ -110,18 +138,19 @@ void SendMongoResponse::Run() { ParseResult ParseMongoMessage(butil::IOBuf* source, Socket* socket, bool /*read_eof*/, const void *arg) { const Server* server = static_cast(arg); - const MongoServiceAdaptor* adaptor = server->options().mongo_service_adaptor; - if (NULL == adaptor) { + const MongoServiceAdaptor* adaptor = + server ? server->options().mongo_service_adaptor : nullptr; + if (server && !adaptor) { // The server does not enable mongo adaptor. return MakeParseError(PARSE_ERROR_TRY_OTHERS); } - char buf[sizeof(mongo_head_t)]; - const char *p = (const char *)source->fetch(buf, sizeof(buf)); - if (NULL == p) { + if (source->size() < sizeof(mongo_head_t)) { return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); } - mongo_head_t header = *(const mongo_head_t*)p; + + mongo_head_t header; + source->copy_to(&header, sizeof(header)); header.make_host_endian(); if (!is_mongo_opcode(header.op_code)) { // The op_code plays the role of "magic number" here. @@ -137,13 +166,14 @@ ParseResult ParseMongoMessage(butil::IOBuf* source, } else if (source->length() < body_len) { return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); } + // Mongo protocol is a protocol with state. Each connection has its own // mongo context. (e.g. last error occured on the connection, the cursor // created by the last Query). The context is stored in // socket::_input_message, and created at the first time when msg // comes over the socket. Destroyable *socket_context_msg = socket->parsing_context(); - if (NULL == socket_context_msg) { + if (NULL == socket_context_msg && server) { MongoContext *context = adaptor->CreateSocketContext(); if (NULL == context) { return MakeParseError(PARSE_ERROR_NO_RESOURCE); @@ -152,15 +182,55 @@ ParseResult ParseMongoMessage(butil::IOBuf* source, socket->reset_parsing_context(socket_context_msg); } policy::MostCommonMessage* msg = policy::MostCommonMessage::Get(); - source->cutn(&msg->meta, sizeof(buf)); - size_t act_body_len = source->cutn(&msg->payload, body_len - sizeof(buf)); - if (act_body_len != body_len - sizeof(buf)) { + source->cutn(&msg->meta, sizeof(header)); + size_t act_body_len = source->cutn(&msg->payload, body_len - sizeof(header)); + if (act_body_len != body_len - sizeof(header)) { CHECK(false); // Very unlikely, unless memory is corrupted. return MakeParseError(PARSE_ERROR_TRY_OTHERS); } return MakeMessage(msg); } +void SerializeMongoRequest(butil::IOBuf* buf, + Controller* cntl, + const google::protobuf::Message* request) { + if (request == NULL) { + return cntl->SetFailed(EREQUEST, "Request is NULL"); + } + if (request->GetDescriptor() != brpc::MongoMessage::descriptor()) { + return cntl->SetFailed(EREQUEST, "The request is not a MongoMessage"); + } + const brpc::MongoMessage* mm = static_cast(request); + if (mm->ByteSize() == 0) { + return cntl->SetFailed(EREQUEST, "request byte size is empty"); + } + if (!mm->SerializeToIOBuf(buf)) { + return cntl->SetFailed(EREQUEST, "failed to serialize request"); + } + cntl->set_stream_creator(butil::get_leaky_singleton()); +} + +void PackMongoRequest(butil::IOBuf *req_buf, + SocketMessage** user_message, + uint64_t correlation_id, + const google::protobuf::MethodDescriptor*, + Controller* cntl, + const butil::IOBuf& request_body, + const Authenticator* auth) { + ControllerPrivateAccessor accessor(cntl); + MongoStreamData *stream_data = static_cast(accessor.get_stream_user_data()); + stream_data->set_correlation_id(correlation_id); + // TODO(helei): handle compress + mongo_head_t head; + head.message_length = sizeof(mongo_head_t) + request_body.size(); + head.request_id = stream_data->request_id(); + head.response_to = 0; + head.op_code = MONGO_OPCODE_MSG; + head.make_host_endian(); + req_buf->append(&head, sizeof(head)); + req_buf->append(request_body); +} + // Defined in baidu_rpc_protocol.cpp void EndRunningCallMethodInPool( ::google::protobuf::Service* service, @@ -183,13 +253,13 @@ void ProcessMongoRequest(InputMessageBase* msg_base) { const google::protobuf::ServiceDescriptor* srv_des = MongoService::descriptor(); if (1 != srv_des->method_count()) { - LOG(WARNING) << "method count:" << srv_des->method_count() - << " of MongoService should be equal to 1!"; + LOG(WARNING) << "method count: " << srv_des->method_count() + << "of MongoService should be equal to 1!"; } - const Server::MethodProperty *mp = - ServerPrivateAccessor(server) - .FindMethodPropertyByFullName(srv_des->method(0)->full_name()); + const Server::MethodProperty* mp = + ServerPrivateAccessor(server) + .FindMethodPropertyByFullName(srv_des->method(0)->full_name()); MongoContextMessage *context_msg = dynamic_cast(socket->parsing_context()); @@ -237,7 +307,7 @@ void ProcessMongoRequest(InputMessageBase* msg_base) { if (NULL == mp || mp->service->GetDescriptor() == BadMethodService::descriptor()) { - mongo_done->cntl.SetFailed(ENOMETHOD, "Fail to find default_method"); + mongo_done->cntl.SetFailed(ENOMETHOD, "Failed to find default_method"); break; } // Switch to service-specific error. @@ -254,20 +324,13 @@ void ProcessMongoRequest(InputMessageBase* msg_base) { } } - if (!MongoOp_IsValid(header->op_code)) { + if (!is_mongo_opcode(header->op_code)) { mongo_done->cntl.SetFailed(EREQUEST, "Unknown op_code:%d", header->op_code); break; } mongo_done->cntl.set_log_id(header->request_id); - const std::string &body_str = msg->payload.to_string(); - mongo_done->req.set_message(body_str.c_str(), body_str.size()); - mongo_done->req.mutable_header()->set_message_length(header->message_length); - mongo_done->req.mutable_header()->set_request_id(header->request_id); - mongo_done->req.mutable_header()->set_response_to(header->response_to); - mongo_done->req.mutable_header()->set_op_code( - static_cast(header->op_code)); - mongo_done->res.mutable_header()->set_response_to(header->request_id); + mongo_done->req.head() = *header; mongo_done->received_us = msg->received_us(); google::protobuf::Service* svc = mp->service; @@ -294,5 +357,80 @@ void ProcessMongoRequest(InputMessageBase* msg_base) { mongo_done->Run(); } +void MongoStreamData::DestroyStreamUserData(SocketUniquePtr& sending_sock, + Controller* cntl, + int error_code, + bool end_of_rpc) { + butil::ResourceId slot{ _request_id }; + butil::return_resource(slot); +} + +StreamUserData* MongoStreamCreator::OnCreatingStream(SocketUniquePtr* inout, + Controller* cntl) { + butil::ResourceId slot; + MongoStreamData *stream_data = butil::get_resource(&slot); + stream_data->set_request_id(slot.value); + return stream_data; +} + +void MongoStreamCreator::DestroyStreamCreator(Controller* cntl) { + // MongoStreamCreator is a global singleton value, Don't delete + // it in this function. +} + +void ProcessMongoResponse(InputMessageBase* msg_base) { + const int64_t start_parse_us = butil::cpuwide_time_us(); + DestroyingPtr msg(static_cast(msg_base)); + mongo_head_t head; + msg->meta.copy_to(&head, sizeof(head)); + head.make_host_endian(); + butil::ResourceId slot{ head.response_to }; + MongoStreamData* stream_data = butil::address_resource(slot); + Controller* cntl = NULL; + if (!stream_data) { + LOG(ERROR) << "failed to address stream data"; + } + const bthread_id_t cid = { stream_data->correlation_id() }; + const int rc = bthread_id_lock(cid, (void**)&cntl); + if (rc != 0) { + LOG_IF(ERROR, rc != EINVAL && rc != EPERM) + << "Fail to lock correlation_id=" << cid << ": " << berror(rc); + return; + } + + ControllerPrivateAccessor accessor(cntl); + Span* span = accessor.span(); + if (span) { + span->set_base_real_us(msg->base_real_us()); + span->set_received_us(msg->received_us()); + span->set_response_size(msg->meta.size() + msg->payload.size() + 12); + span->set_start_parse_us(start_parse_us); + } + + const int saved_error = cntl->ErrorCode(); + do { + if (cntl->response() == NULL) { + break; + } + if (cntl->response()->GetDescriptor() != MongoMessage::descriptor()) { + cntl->SetFailed(ERESPONSE, "must be mongo response"); + break; + } + MongoMessage* rsp = dynamic_cast(cntl->response()); + if (!rsp) { + cntl->SetFailed(ERESPONSE, "must be mongo response"); + break; + } + msg->meta.copy_to(&rsp->head(), sizeof(rsp->head())); + rsp->head().make_host_endian(); + if (!rsp->ParseFromIOBuf(&msg->payload)) { + cntl->SetFailed(ERESPONSE, "failed to parse mongo response"); + break; + } + } while (false); + msg.reset(); + accessor.OnResponse(cid, saved_error); +} + } // namespace policy } // namespace brpc diff --git a/src/brpc/policy/mongo_protocol.h b/src/brpc/policy/mongo_protocol.h index 3b8e6c44c3..842a4a821f 100644 --- a/src/brpc/policy/mongo_protocol.h +++ b/src/brpc/policy/mongo_protocol.h @@ -20,6 +20,8 @@ #include "brpc/protocol.h" #include "brpc/input_messenger.h" +#include "brpc/socket_message.h" +#include "butil/iobuf.h" namespace brpc { @@ -28,9 +30,30 @@ namespace policy { // Parse binary format of mongo ParseResult ParseMongoMessage(butil::IOBuf* source, Socket* socket, bool read_eof, const void *arg); + // Actions to a (client) request in mongo format void ProcessMongoRequest(InputMessageBase* msg); +void PackMongoRequest(butil::IOBuf *buf, + SocketMessage**, + uint64_t correlation_id, + const google::protobuf::MethodDescriptor* method, + Controller* controller, + const butil::IOBuf& request, + const Authenticator* auth); + +// Actions to a (server) response in mongo format. +void ProcessMongoResponse(InputMessageBase* msg); + + +void SerializeMongoRequest(butil::IOBuf* buf, + Controller* cntl, + const google::protobuf::Message* request); + +const std::string& GetMongoMethodName( + const google::protobuf::MethodDescriptor*, + const Controller*); + } // namespace policy } // namespace brpc diff --git a/src/brpc/proto_base.proto b/src/brpc/proto_base.proto index c0bbc086e1..c7d5df096a 100644 --- a/src/brpc/proto_base.proto +++ b/src/brpc/proto_base.proto @@ -24,6 +24,8 @@ message RedisResponseBase {} message EspMessageBase {} +message MongoMessageBase {} + message MemcacheRequestBase {} message MemcacheResponseBase {} diff --git a/src/butil/bson_util.cc b/src/butil/bson_util.cc index 0b7cb63786..901ca243d2 100644 --- a/src/butil/bson_util.cc +++ b/src/butil/bson_util.cc @@ -46,7 +46,7 @@ BsonEnumerator::~BsonEnumerator() { UniqueBsonPtr ExtractBsonFromIOBuf(IOBuf& iobuf) { uint32_t bson_length; const size_t n = iobuf.copy_to(&bson_length, sizeof(bson_length)); - if (n < sizeof(bson_length) || iobuf.size() < bson_length + sizeof(bson_length)) { + if (n < sizeof(bson_length) || iobuf.size() < bson_length) { return nullptr; } std::unique_ptr buffer(new uint8_t[bson_length]);