2021SC@SDUSC BRPC源码分析(十二)thrift
thrift是应用较广的RPC框架,最初由Facebook发布,后交由Apache维护。为了和thrift服务互通,同时解决thrift原生方案在多线程安全、易用性、并发能力等方面的一系列问题,brpc实现并支持thrift在NonBlocking模式下的协议(FramedProtocol), 下文均直接称为thrift协议。
相比使用原生方案的优势有:
为了复用解析代码,brpc对thrift的支持仍需要依赖thrift库以及thrift生成的代码。
brpc默认不启用thrift支持也不需要thrift依赖。但如果需用thrift协议, 配置brpc环境的时候需加上–with-thrift或-DWITH_THRIFT=ON.
Linux下安装thrift依赖 先参考官方wiki安装好必备的依赖和工具,然后从官网下载thrift源代码,解压编译。
wget http://www.apache.org/dist/thrift/0.11.0/thrift-0.11.0.tar.gz tar -xf thrift-0.11.0.tar.gz cd thrift-0.11.0/ ./configure --prefix=/usr --with-ruby=no --with-python=no --with-java=no --with-go=no --with-perl=no --with-php=no --with-csharp=no --with-erlang=no --with-lua=no --with-nodejs=no make CPPFLAGS=-DFORCE_BOOST_SMART_PTR -j 4 -s sudo make install
配置brpc支持thrift协议后make。编译完成后会生成libbrpc.a, 其中包含了支持thrift协议的扩展代码, 像正常使用brpc的代码一样链接即可。
# Ubuntu sh config_brpc.sh --headers=/usr/include --libs=/usr/lib --with-thrift # Fedora/CentOS sh config_brpc.sh --headers=/usr/include --libs=/usr/lib64 --with-thrift # Or use cmake mkdir build && cd build && cmake ../ -DWITH_THRIFT=1
#include <brpc/channel.h> #include <brpc/thrift_message.h> // 定义了ThriftStub ... DEFINE_string(server, "0.0.0.0:8019", "IP Address of thrift server"); DEFINE_string(load_balancer, "", "The algorithm for load balancing"); ... brpc::ChannelOptions options; options.protocol = brpc::PROTOCOL_THRIFT; brpc::Channel thrift_channel; if (thrift_channel.Init(Flags_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) { LOG(ERROR) << "Fail to initialize thrift channel"; return -1; } brpc::ThriftStub stub(&thrift_channel); ... // example::[EchoRequest/EchoResponse]是thrift生成的消息 example::EchoRequest req; example::EchoResponse res; req.data = "hello"; stub.CallMethod("Echo", &cntl, &req, &res, NULL); if (cntl.Failed()) { LOG(ERROR) << "Fail to send thrift request, " << cntl.ErrorText(); return -1; }
class ThriftService : public Describable { public: ThriftService(); virtual ~ThriftService(); virtual void ProcessThriftFramedRequest( Controller* controller, ThriftFramedMessage* request, ThriftFramedMessage* response, ::google::protobuf::Closure* done) = 0; void Describe(std::ostream &os, const DescribeOptions&) const; private: DISALLOW_COPY_AND_ASSIGN(ThriftService); friend class policy::ThriftClosure; friend void policy::ProcessThriftRequest(InputMessageBase* msg_base); friend class StatusService; friend class Server; private: void Expose(const butil::StringPiece& prefix); MethodStatus* _status; }; }
namespace brpc { class ThriftStub; static const int16_t THRIFT_INVALID_FID = -1; static const int16_t THRIFT_REQUEST_FID = 1; static const int16_t THRIFT_RESPONSE_FID = 0; class ThriftMessageBase { public: …… 略 …… }; class ThriftFramedMessage : public ::google::protobuf::Message { friend class ThriftStub; public: butil::IOBuf body; int16_t field_id; private: bool _own_raw_instance; ThriftMessageBase* _raw_instance; public: …… 略 …… protected: …… 略 …… private: …… 略 …… }; class ThriftStub { public: …… 略 …… private: ChannelBase* _channel; }; namespace details { template <typename T> class ThriftMessageWrapper final : public ThriftMessageBase { public: ThriftMessageWrapper() : msg_ptr(NULL) {} ThriftMessageWrapper(T* msg2) : msg_ptr(msg2) {} virtual ~ThriftMessageWrapper() {} uint32_t Read(::apache::thrift::protocol::TProtocol* iprot) override final { return msg_ptr->T::read(iprot); } uint32_t Write(::apache::thrift::protocol::TProtocol* oprot) const override final { return msg_ptr->T::write(oprot); } T* msg_ptr; }; template <typename T> class ThriftMessageHolder final : public ThriftMessageBase { public: virtual ~ThriftMessageHolder() {} uint32_t Read(::apache::thrift::protocol::TProtocol* iprot) override final { return msg.T::read(iprot); } uint32_t Write(::apache::thrift::protocol::TProtocol* oprot) const override final { return msg.T::write(oprot); } T msg; }; template <typename RESPONSE> class ThriftDoneWrapper : public ::google::protobuf::Closure { public: explicit ThriftDoneWrapper(::google::protobuf::Closure* done) : _done(done) {} void Run() override { _done->Run(); delete this; } private: ::google::protobuf::Closure* _done; public: ThriftMessageWrapper<RESPONSE> raw_response_wrapper; ThriftFramedMessage response; }; } …… 略 …… template <typename REQUEST, typename RESPONSE> void ThriftStub::CallMethod(const char* method_name, Controller* cntl, const REQUEST* raw_request, RESPONSE* raw_response, ::google::protobuf::Closure* done) { cntl->_thrift_method_name.assign(method_name); details::ThriftMessageWrapper<REQUEST> raw_request_wrapper(const_cast<REQUEST*>(raw_request)); ThriftFramedMessage request; request._raw_instance = &raw_request_wrapper; if (done == NULL) { ThriftFramedMessage response; details::ThriftMessageWrapper<RESPONSE> raw_response_wrapper(raw_response); response._raw_instance = &raw_response_wrapper; _channel->CallMethod(NULL, cntl, &request, &response, NULL); } else { details::ThriftDoneWrapper<RESPONSE>* new_done = new details::ThriftDoneWrapper<RESPONSE>(done); new_done->raw_response_wrapper.msg_ptr = raw_response; new_done->response._raw_instance = &new_done->raw_response_wrapper; _channel->CallMethod(NULL, cntl, &request, &new_done->response, new_done); } }