https://github.com/zq2599/blog_demos
内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;
为了突出重点,这里将几个关键的知识点提前给出:
名称 | 链接 | 备注 |
---|---|---|
项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
// gRPC服务,这是个在线商城的购物车服务 service CartService { // 客户端流式:添加多个商品到购物车 rpc AddToCart (stream ProductOrder) returns (AddCartReply) {} } // 提交购物车时的产品信息 message ProductOrder { // 商品ID int32 productId = 1; // 商品数量 int32 number = 2; } // 提交购物车返回结果的数据结构 message AddCartReply { // 返回码 int32 code = 1; // 描述信息 string message = 2; }
// 使用springboot插件 plugins { id 'org.springframework.boot' } dependencies { implementation 'org.projectlombok:lombok' implementation 'org.springframework.boot:spring-boot-starter' // 作为gRPC服务提供方,需要用到此库 implementation 'net.devh:grpc-server-spring-boot-starter' // 依赖自动生成源码的工程 implementation project(':grpc-lib') // annotationProcessor不会传递,使用了lombok生成代码的模块,需要自己声明annotationProcessor annotationProcessor 'org.projectlombok:lombok' }
spring: application: name: client-stream-server-side # gRPC有关的配置,这里只需要配置服务端口号 grpc: server: port: 9900
启动类ClientStreamServerSideApplication.java的代码就不贴了,普通的springboot启动类而已;
重点是提供grpc服务的GrpcServerService.java,请结合前面小结的第五点来阅读代码,咱们要做的就是给上层框架返回一个匿名类,至于里面的onNext、onCompleted方法何时被调用是上层框架决定的,另外还准备了成员变量totalCount,这样就可以记录总数了:
package com.bolingcavalry.grpctutorials; import com.bolingcavalry.grpctutorials.lib.AddCartReply; import com.bolingcavalry.grpctutorials.lib.CartServiceGrpc; import com.bolingcavalry.grpctutorials.lib.ProductOrder; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; import net.devh.boot.grpc.server.service.GrpcService; @GrpcService @Slf4j public class GrpcServerService extends CartServiceGrpc.CartServiceImplBase { @Override public StreamObserver<ProductOrder> addToCart(StreamObserver<AddCartReply> responseObserver) { // 返回匿名类,给上层框架使用 return new StreamObserver<ProductOrder>() { // 记录处理产品的总量 private int totalCount = 0; @Override public void onNext(ProductOrder value) { log.info("正在处理商品[{}],数量为[{}]", value.getProductId(), value.getNumber()); // 增加总量 totalCount += value.getNumber(); } @Override public void one rror(Throwable t) { log.error("添加购物车异常", t); } @Override public void onCompleted() { log.info("添加购物车完成,共计[{}]件商品", totalCount); responseObserver.onNext(AddCartReply.newBuilder() .setCode(10000) .setMessage(String.format("添加购物车完成,共计[%d]件商品", totalCount)) .build()); responseObserver.onCompleted(); } }; } }
plugins { id 'org.springframework.boot' } dependencies { implementation 'org.projectlombok:lombok' implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'net.devh:grpc-client-spring-boot-starter' implementation project(':grpc-lib') }
server: port: 8082 spring: application: name: client-stream-client-side grpc: client: # gRPC配置的名字,GrpcClient注解会用到 client-stream-server-side: # gRPC服务端地址 address: 'static://127.0.0.1:9900' enableKeepAlive: true keepAliveWithoutCalls: true negotiationType: plaintext
package com.bolingcavalry.grpctutorials; import io.grpc.stub.StreamObserver; public interface ExtendResponseObserver<T> extends StreamObserver<T> { String getExtra(); }
package com.bolingcavalry.grpctutorials; import com.bolingcavalry.grpctutorials.lib.AddCartReply; import com.bolingcavalry.grpctutorials.lib.CartServiceGrpc; import com.bolingcavalry.grpctutorials.lib.ProductOrder; import io.grpc.stub.StreamObserver; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import net.devh.boot.grpc.client.inject.GrpcClient; import org.springframework.stereotype.Service; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @Service @Slf4j public class GrpcClientService { @GrpcClient("client-stream-server-side") private CartServiceGrpc.CartServiceStub cartServiceStub; public String addToCart(int count) { CountDownLatch countDownLatch = new CountDownLatch(1); // responseObserver的onNext和onCompleted会在另一个线程中被执行, // ExtendResponseObserver继承自StreamObserver ExtendResponseObserver<AddCartReply> responseObserver = new ExtendResponseObserver<AddCartReply>() { String extraStr; @Override public String getExtra() { return extraStr; } private int code; private String message; @Override public void onNext(AddCartReply value) { log.info("on next"); code = value.getCode(); message = value.getMessage(); } @Override public void one rror(Throwable t) { log.error("gRPC request error", t); extraStr = "gRPC error, " + t.getMessage(); countDownLatch.countDown(); } @Override public void onCompleted() { log.info("on complete"); extraStr = String.format("返回码[%d],返回信息:%s" , code, message); countDownLatch.countDown(); } }; // 远程调用,此时数据还没有给到服务端 StreamObserver<ProductOrder> requestObserver = cartServiceStub.addToCart(responseObserver); for(int i=0; i<count; i++) { // 发送一笔数据到服务端 requestObserver.onNext(build(101 + i, 1 + i)); } // 客户端告诉服务端:数据已经发完了 requestObserver.onCompleted(); try { // 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行, // 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了, // await的超时时间设置为2秒 countDownLatch.await(2, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error("countDownLatch await error", e); } log.info("service finish"); // 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得 return responseObserver.getExtra(); } /** * 创建ProductOrder对象 * @param productId * @param num * @return */ private static ProductOrder build(int productId, int num) { return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build(); } }
package com.bolingcavalry.grpctutorials; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.util.List; @RestController public class GrpcClientController { @Autowired private GrpcClientService grpcClientService; @RequestMapping("/") public String printMessage(@RequestParam(defaultValue = "1") int count) { return grpcClientService.addToCart(count); } }
微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
https://github.com/zq2599/blog_demos