在 gRPC(1):入门及简单使用(go) 中,我们实现了一个简单的 gRPC 应用程序,其中双方通信是简单的请求—响应模式,没发出一个请求都会得到一个响应,然而,借助 gRPC 可以实现不同的通信模式,这里介绍四种 gRPC 应用程序的基础通信模式:一元RPC、服务端流RPC、客户端流RPC、双向流RPC
一元 RPC 也被称为简单 RPC, 其实就是 gRPC(1):入门及简单使用(go) 中实现的请求—响应模式,每调用一次得到一个结果,这里再以一个简单的订单管理程序做说明,实现两个服务:addOrder 用于添加订单;getOrder 用于根据 id 获取订单:
syntax = "proto3"; package proto; option go_package = "./proto"; service OrderManagement { rpc addOrder(Order) returns (StringValue); rpc getOrder(StringValue) returns (Order); } message Order { string id = 1; repeated string items = 2; // repeated 表示列表 string description = 3; float price = 4; string destination = 5; } message StringValue { string value = 1; }
package main import ( "context" "fmt" "log" "net" "strings" pb "order/proto" "github.com/gofrs/uuid" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) const ( port = ":50051" ) type server struct { pb.UnimplementedOrderManagementServer } // 模拟存储 var orderMap = make(map[string]*pb.Order) func (s *server) AddOrder(ctx context.Context, order *pb.Order) (*pb.StringValue, error) { id, err := uuid.NewV4() if err != nil { return nil, status.Errorf(codes.Internal, "Error while generating Product ID", err) } order.Id = id.String() orderMap[order.Id] = order log.Printf("Order %v : %v - Added.", order.Id, order.Description) return &pb.StringValue{Value: order.Id}, nil } func (s *server) GetOrder(ctx context.Context, orderID *pb.StringValue) (*pb.Order, error) { order, exists := orderMap[orderID.Value] if exists && order != nil { log.Printf("Order %v : %v - Retrieved.", order.Id, order.Description) return order, nil } return nil, status.Errorf(codes.NotFound, "Order does not exist.", orderID.Value) } func main() { lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterOrderManagementServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
package main import ( "context" "io" "log" "time" pb "order/proto" "google.golang.org/grpc" ) const ( address = "localhost:50051" ) func main() { conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewOrderManagementClient(conn) orderID, err := c.AddOrder(context.Background(), &pb.Order{ Items: []string{"XiaoMI 11"}, Description: "XiaoMI 11", Price: 3999, Destination: "suzhou", }) if err != nil { log.Fatalf("could not add order: %v", err) } log.Printf("Added order: %v", orderID.Value) }
与一元 RPC 不同的是,流模式下响应或者请求都可以是一个序列,这个序列也被称为”流“,服务端流 RPC 下,客户端发出一个请求,但不会立即得到一个响应,而是在服务端与客户端之间建立一个单向的流,服务端可以随时向流中写入多个响应消息,最后主动关闭流,而客户端需要监听这个流,不断获取响应直到流关闭
下面以一个简单的关键词搜索功能为例,客户端发送关键字,服务端进行匹配,每找到一个就写进流中,在之前的基础上添加代码:
service OrderManagement { ... // stream 将返回参数指定为订单流 rpc searchOrders(StringValue) returns (stream Order); }
func (s *server) SearchOrders(searchQuery *pb.StringValue, stream pb.OrderManagement_SearchOrdersServer) error { for key, order := range orderMap { for _, item := range order.Items { if strings.Contains(item, searchQuery.Value) { err := stream.Send(&order) if err != nil { return fmt.Errorf("error sending message to stream: %v", err) } log.Printf("order found: " + key) break } } } return nil }
... // 获得建立的流对象 stream, err := c.SearchOrders(context.Background(), &pb.StringValue{Value: "XiaoMI"}) if err != nil { log.Fatalf("search error: %v", err) } for { // 循环读取 order, err := stream.Recv() if err == io.EOF { log.Print("EOF") break } if err != nil { log.Fatal("error: ", err) } log.Print(order) }
客户端流,和服务端流一样的道理,只不过流的方向变为从客户端到服务端,可以发送多条响应,服务端只会响应一次,但何时响应取决于服务端的逻辑,以更新订单序列为例,客户端可以发送一系列订单,服务端可以选择在任意时候停止读取并发送响应:
service OrderManagement { ... rpc updateOrders(stream Order) returns (StringValue); }
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error { for { order, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&pb.StringValue{Value: "finished"}) } if err != nil { return err } orderMap[order.Id] = order log.Print("OrderID " + order.Id + " updated") } }
// 取得流 updateStream, err := c.UpdateOrders(context.Background()) if err != nil { log.Fatalf("update err: %v", err) } // 发送 Order1 if err = updateStream.Send(&pb.Order{ Id: "1", Items: []string{"Huawei P50"}, Description: "Huawei P50", Price: 5999, Destination: "suzhou", }); err != nil { log.Fatalf("send error: %v", err) } // 发送 Order2 if err = updateStream.Send(&pb.Order{ Id: "2", Items: []string{"iphone 12"}, Description: "iphone 12", Price: 8999, Destination: "suzhou", }); err != nil { log.Fatalf("send error: %v", err) } ... // 关闭流,结束发送 updateRes, err := updateStream.CloseAndRecv() if err != nil { log.Fatalf("update stream close error: %v", err) } log.Printf("update res: %v", updateRes)
双向流,顾名思义,由客户端发起调用后,将建立起双向的流,在这之后,通信将完全基于双方的应用逻辑,流的操作完全独立,客户端和服务端可以按照任意顺序进行读取和写入,以一个订单筛选过程为例,客户端发送一串订单 ID 序列,服务端进行检查,每遇到一个有效的 ID 就写入流中响应:
service OrderManagement { ... rpc processOrders(stream StringValue) returns (stream StringValue); }
func (s *server) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error { for { orderId, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err } order, exists := orderMap[orderId.Value] if exists && order != nil { stream.Send(&pb.StringValue{Value: order.Id}) } } }
... // 取得双向流 processStream, err := c.ProcessOrders(context.Background()) // 同步channel,防止主程序提前退出 waitc := make(chan struct{}) // 双向流是完全异步的,开一个协程用于读取响应 go func() { for { orderId, err := processStream.Recv() if err == io.EOF { close(waitc) return } if err != nil { log.Fatalf("recv error: %v", err) } log.Print("recv " + orderId.Value) } }() // 请求 if err = processStream.Send(&pb.StringValue{Value: "1"}); err != nil { log.Fatalf("1 send error: %v", err) } if err = processStream.Send(&pb.StringValue{Value: "2"}); err != nil { log.Fatalf("2 send error: %v", err) } if err = processStream.Send(&pb.StringValue{Value: "3"}); err != nil { log.Fatalf("3 send error: %v", err) } if err = processStream.CloseSend(); err != nil { log.Fatal(err) } // 等待读取结束 <-waitc
这就是 gRPC 中主要的四种通信模式,基于它们可以实现各种 gRPC 场景下的交互,至于选择哪种,还需根据具体的场景考虑