https://github.com/zq2599/blog_demos
内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;
名称 | 链接 | 备注 |
---|---|---|
项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
// gRPC服务,这是个在线商城的库存服务 service StockService { // 双向流式:批量扣减库存 rpc BatchDeduct (stream ProductOrder) returns (stream DeductReply) {} } // 扣减库存返回结果的数据结构 message DeductReply { // 返回码 int32 code = 1; // 描述信息 string message = 2; }
// 使用springboot插件 plugins { id 'org.springframework.boot' } dependencies { implementation 'org.projectlombok:lombok' implementation 'org.springframework.boot:spring-boot-starter' // 作为gRPC服务提供方,需要用到此库 implementation 'net.devh:grpc-server-spring-boot-starter' // 依赖自动生成源码的工程 implementation project(':grpc-lib') // annotationProcessor不会传递,使用了lombok生成代码的模块,需要自己声明annotationProcessor annotationProcessor 'org.projectlombok:lombok' }
spring: application: name: double-stream-server-side # gRPC有关的配置,这里只需要配置服务端口号 grpc: server: port: 9901
package grpctutorials; import com.bolingcavalry.grpctutorials.lib.DeductReply; import com.bolingcavalry.grpctutorials.lib.ProductOrder; import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; import net.devh.boot.grpc.server.service.GrpcService; @GrpcService @Slf4j public class GrpcServerService extends StockServiceGrpc.StockServiceImplBase { @Override public StreamObserver<ProductOrder> batchDeduct(StreamObserver<DeductReply> responseObserver) { // 返回匿名类,给上层框架使用 return new StreamObserver<ProductOrder>() { private int totalCount = 0; @Override public void onNext(ProductOrder value) { log.info("正在处理商品[{}],数量为[{}]", value.getProductId(), value.getNumber()); // 增加总量 totalCount += value.getNumber(); int code; String message; // 假设单数的都有库存不足的问题 if (0 == value.getNumber() % 2) { code = 10000; message = String.format("商品[%d]扣减库存数[%d]成功", value.getProductId(), value.getNumber()); } else { code = 10001; message = String.format("商品[%d]扣减库存数[%d]失败", value.getProductId(), value.getNumber()); } responseObserver.onNext(DeductReply.newBuilder() .setCode(code) .setMessage(message) .build()); } @Override public void one rror(Throwable t) { log.error("批量减扣库存异常", t); } @Override public void onCompleted() { log.info("批量减扣库存完成,共计[{}]件商品", totalCount); responseObserver.onCompleted(); } }; } }
plugins { id 'org.springframework.boot' } dependencies { implementation 'org.projectlombok:lombok' implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'net.devh:grpc-client-spring-boot-starter' implementation project(':grpc-lib') }
server: port: 8082 spring: application: name: double-stream-client-side grpc: client: # gRPC配置的名字,GrpcClient注解会用到 double-stream-server-side: # gRPC服务端地址 address: 'static://127.0.0.1:9901' enableKeepAlive: true keepAliveWithoutCalls: true negotiationType: plaintext
启动类DoubleStreamClientSideApplication.java的代码就不贴了,普通的springboot启动类而已;
正常情况下我们都是用StreamObserver处理服务端响应,这里由于是异步响应,需要额外的方法从StreamObserver中取出业务数据,于是定一个新接口,继承自StreamObserver,新增getExtra方法可以返回String对象,详细的用法稍后会看到:
package com.bolingcavalry.grpctutorials; import io.grpc.stub.StreamObserver; public interface ExtendResponseObserver<T> extends StreamObserver<T> { String getExtra(); }
package grpctutorials; import com.bolingcavalry.grpctutorials.lib.DeductReply; import com.bolingcavalry.grpctutorials.lib.ProductOrder; import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; import net.devh.boot.grpc.client.inject.GrpcClient; import org.springframework.stereotype.Service; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @Service @Slf4j public class GrpcClientService { @GrpcClient("double-stream-server-side") private StockServiceGrpc.StockServiceStub stockServiceStub; /** * 批量减库存 * @param count * @return */ public String batchDeduct(int count) { CountDownLatch countDownLatch = new CountDownLatch(1); // responseObserver的onNext和onCompleted会在另一个线程中被执行, // ExtendResponseObserver继承自StreamObserver ExtendResponseObserver<DeductReply> responseObserver = new ExtendResponseObserver<DeductReply>() { // 用stringBuilder保存所有来自服务端的响应 private StringBuilder stringBuilder = new StringBuilder(); @Override public String getExtra() { return stringBuilder.toString(); } /** * 客户端的流式请求期间,每一笔请求都会收到服务端的一个响应, * 对应每个响应,这里的onNext方法都会被执行一次,入参是响应内容 * @param value */ @Override public void onNext(DeductReply value) { log.info("batch deduct on next"); // 放入匿名类的成员变量中 stringBuilder.append(String.format("返回码[%d],返回信息:%s<br>" , value.getCode(), value.getMessage())); } @Override public void one rror(Throwable t) { log.error("batch deduct gRPC request error", t); stringBuilder.append("batch deduct gRPC error, " + t.getMessage()); countDownLatch.countDown(); } /** * 服务端确认响应完成后,这里的onCompleted方法会被调用 */ @Override public void onCompleted() { log.info("batch deduct on complete"); // 执行了countDown方法后,前面执行countDownLatch.await方法的线程就不再wait了, // 会继续往下执行 countDownLatch.countDown(); } }; // 远程调用,此时数据还没有给到服务端 StreamObserver<ProductOrder> requestObserver = stockServiceStub.batchDeduct(responseObserver); for(int i=0; i<count; i++) { // 每次执行onNext都会发送一笔数据到服务端, // 服务端的onNext方法都会被执行一次 requestObserver.onNext(build(101 + i, 1 + i)); } // 客户端告诉服务端:数据已经发完了 requestObserver.onCompleted(); try { // 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行, // 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了, // await的超时时间设置为2秒 countDownLatch.await(2, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error("countDownLatch await error", e); } log.info("service finish"); // 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得 return responseObserver.getExtra(); } /** * 创建ProductOrder对象 * @param productId * @param num * @return */ private static ProductOrder build(int productId, int num) { return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build(); } }
package grpctutorials; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class GrpcClientController { @Autowired private GrpcClientService grpcClientService; @RequestMapping("/") public String printMessage(@RequestParam(defaultValue = "1") int count) { return grpcClientService.batchDeduct(count); } }
微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
https://github.com/zq2599/blog_demos