对于同步API而言,程序的吞吐量并不高。因为在每次发送一个gRPC请求时,会阻塞整个线程,必须等待服务端的ack回到客户端才能继续运行或者发送下一个请求,因此异步API是提升程序吞吐量的必要手段。
gRPC异步操作依赖于完成队列CompletionQueue
官网教程:https://grpc.io/docs/languages/cpp/async/
参考博客1:https://www.luozhiyun.com/archives/671
参考博客2:https://blog.miigon.net/posts/cn-so-difference-between-sync-and-async-grpc/
//客户端的类实例在初始化时就需要创建Channel和Stub了,具体看官方实例代码中的greeter_async_client.cc CompletionQueue cq;//创建完成队列 std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc( stub_->AsyncSayHello(&context, request, &cq));//将完成队列绑定到存根,进而创建出客户端异步响应读取器
class GreeterClient { public: explicit GreeterClient(std::shared_ptr<Channel> channel) : stub_(Greeter::NewStub(channel)) {} // 客户端SayHello void SayHello(const std::string& user) { // RPC请求数据封装 HelloRequest request; request.set_name(user); // 异步客户端请求,存储请求响应的状态和数据的结构体等,在下方进行的定义 AsyncClientCall* call = new AsyncClientCall; // 初始化response_reader // stub_->PrepareAsyncSayHello()创建一个RPC对象,但是不会立即启动RPC调用 call->response_reader = stub_->PrepareAsyncSayHello(&call->context, request, &cq_); // StartCall()方法发起真正的RPC请求 call->response_reader->StartCall(); // Finish()方法前两个参数用于指定响应数据的存储位置,第三个参数指定了该次RPC异步请求的地址 call->response_reader->Finish(&call->reply, &call->status, (void*)call); } // 不断循环监听完成队列,对响应进行处理 void AsyncCompleteRpc() { void* got_tag; bool ok = false; // 在队列为空时阻塞,队列中有响应结果时读取到got_tag和ok两个参数 // 前者是结果对应的RPC请求的地址,后者是响应的状态 while (cq_.Next(&got_tag, &ok)) { // 类型转换,获取到的实际上是此响应结果对应的RPC请求的地址,在这个地址下保存了实际的响应结果数据 AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag); // 验证请求是否真的完成了 GPR_ASSERT(ok); if (call->status.ok()) std::cout << "Greeter received: " << call->reply.message() << std::endl; else std::cout << "RPC failed" << std::endl; // 完成了响应的处理后,清除该RPC请求 delete call; } } private: // 异步客户端通话,存储一次RPC通话的信息,里面包含响应的状态和数据的结构 struct AsyncClientCall { // 服务器返回的响应数据 HelloReply reply; // 客户端的上下文信息,可以被用于向服务器传达额外信息或调整某些RPC行为 ClientContext context; // RPC响应的状态 Status status; // 客户端异步响应读取器 std::unique_ptr<ClientAsyncResponseReader<HelloReply>> response_reader; }; // 存根,在我们的视角里就是服务器端暴露的服务接口 std::unique_ptr<Greeter::Stub> stub_; // 完成队列,一个用于gRPC异步处理的生产者消费者队列 CompletionQueue cq_; }; int main(int argc, char** argv) { // 实例化一个客户端,需要一个信道,第二个参数表明该通道未经过身份验证 GreeterClient greeter(grpc::CreateChannel( "localhost:50051", grpc::InsecureChannelCredentials())); // 独立的异步响应处理线程 // 由于该方法是客户端类内的非静态方法,所以需要传入客户端类的实例表明归属 std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter); //发送异步的请求SayHello() for (int i = 0; i < 100; i++) { std::string user("world " + std::to_string(i)); greeter.SayHello(user); // The actual RPC call! } std::cout << "Press control-c to quit" << std::endl << std::endl; thread_.join(); //永远会阻塞,因为异步响应处理线程永远不会停止,必须ctrl+c才能退出 return 0; }
//服务器实现类 class ServerImpl final { public: ~ServerImpl() { server_->Shutdown(); // 关闭服务器后也要关闭完成队列 cq_->Shutdown(); } // There is no shutdown handling in this code. void Run() { // 服务器地址和端口 std::string server_address("0.0.0.0:50051"); // 服务器构建器 ServerBuilder builder; // 服务器IP与端口指定,第二个参数表示该通道未经过身份验证 builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); // 注册服务 builder.RegisterService(&service_); // 为当前服务器创建完成队列 cq_ = builder.AddCompletionQueue(); // 构建并启动服务器 server_ = builder.BuildAndStart(); std::cout << "Server listening on " << server_address << std::endl; // 运行服务器的主流程 HandleRpcs(); } private: // 处理一个请求所需要保存的状态、逻辑、数据被封装成了CallData类 class CallData { public: // 传入service实例和服务器端的完成队列,创建后status_是CREATE状态 CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq) : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) { // 立即调用服务逻辑 Proceed(); } void Proceed() { if (status_ == CREATE) { // 转换为PROCESS状态 status_ = PROCESS; // 作为初始化的一部分,请求系统开始处理SayHello请求。 // 此处的this指代此CallData实例 service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, this); } else if (status_ == PROCESS) { // 在执行当前CallData的任务时,创建一个新的CallData实例去为新的请求服务 // 当前CallData会在FINISH阶段自行销毁 new CallData(service_, cq_); // 实际的逻辑处理 std::string prefix("Hello "); reply_.set_message(prefix + request_.name()); // 在完成后将状态置为FINISH。使用当前CallData的地址作为该事件的标签 status_ = FINISH; responder_.Finish(reply_, Status::OK, this); } else { GPR_ASSERT(status_ == FINISH); // 销毁当前CallData delete this; } } private: // 异步服务 Greeter::AsyncService* service_; //服务器端的完成队列,是一个生产者-消费者队列 ServerCompletionQueue* cq_; // 服务器端上下文信息,可以被用于向客户端传达额外信息、数据或调整某些RPC行为 ServerContext ctx_; // 客户端发来的请求 HelloRequest request_; // 服务端的响应 HelloReply reply_; // 发送服务端响应的工具 ServerAsyncResponseWriter<HelloReply> responder_; // 状态机定义 enum CallStatus { CREATE, PROCESS, FINISH }; CallStatus status_; // 当前的状态 }; // 如果有需求的话,服务器的处理可以是多线程的 void HandleRpcs() { // 创建一个新的CallData,将完成队列中的数据封装进去 new CallData(&service_, cq_.get()); void* tag; // 请求特有的标签,实际上是请求的RPC会话对象在客户端的地址信息 bool ok; while (true) { // 阻塞等待读取完成队列中的事件,每个事件使用一个标签进行标识,该标签是CallData实例的地址 // 完成队列的Next()方法应该每次都检查返回值,来确保事件和完成队列的状态正常 GPR_ASSERT(cq_->Next(&tag, &ok)); GPR_ASSERT(ok); // 从标签转换为CallData*类型,进而访问CallData中的方法 static_cast<CallData*>(tag)->Proceed(); } } // 当前服务器的完成队列 std::unique_ptr<ServerCompletionQueue> cq_; // 当前服务器的异步服务 Greeter::AsyncService service_; // 服务器实例 std::unique_ptr<Server> server_; }; int main(int argc, char** argv) { ServerImpl server; server.Run(); return 0; }