https://github.com/zq2599/blog_demos
内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;
本文是《java版gRPC实战》系列的第六篇,前面咱们在开发客户端应用时,所需的服务端地址都是按如下步骤设置的:
local-server应用是个简单的gRPC服务端,其详细信息请参考《java版gRPC实战之二:服务发布和调用》
本篇由以下章节组成:
名称 | 链接 | 备注 |
---|---|---|
项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
plugins { id 'org.springframework.boot' } dependencies { implementation 'org.projectlombok:lombok' implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'net.devh:grpc-client-spring-boot-starter' implementation 'io.etcd:jetcd-core' implementation project(':grpc-lib') }
server: port: 8084 spring: application: name: get-service-addr-from-etcd grpc: # etcd的地址,从此处取得gRPC服务端的IP和端口 etcdendpoints: 'http://192.168.72.128:2379,http://192.168.50.239:2380,http://192.168.50.239:2381'
启动类DynamicServerAddressDemoApplication.java的代码就不贴了,普通的springboot启动类而已;
新增StubWrapper.java文件,这是个spring bean,要重点关注的是simpleBlockingStub方法,当bean在spring注册的时候simpleBlockingStub方法会被执行,这样每当bean在spring注册时,都会从etcd查询gRPC服务端信息,然后创建SimpleBlockingStub对象:
package com.bolingcavalry.dynamicrpcaddr; import com.bolingcavalry.grpctutorials.lib.SimpleGrpc; import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.Client; import io.etcd.jetcd.KV; import io.etcd.jetcd.kv.GetResponse; import io.grpc.Channel; import io.grpc.ManagedChannelBuilder; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Arrays; import static com.google.common.base.Charsets.UTF_8; /** * @author will (zq2599@gmail.com) * @version 1.0 * @description: 包装了SimpleBlockingStub实例的类,发起gRPC请求时需要用到SimpleBlockingStub实例 * @date 2021/5/8 19:34 */ @Component("stubWrapper") @Data @Slf4j @ConfigurationProperties(prefix = "grpc") public class StubWrapper { /** * 这是etcd中的一个key,该key对应的值是grpc服务端的地址信息 */ private static final String GRPC_SERVER_INFO_KEY = "/grpc/local-server"; /** * 配置文件中写好的etcd地址 */ private String etcdendpoints; private SimpleGrpc.SimpleBlockingStub simpleBlockingStub; /** * 从etcd查询gRPC服务端的地址 * @return */ public String[] getGrpcServerInfo() { // 创建client类 KV kvClient = Client.builder().endpoints(etcdendpoints.split(",")).build().getKVClient(); GetResponse response = null; // 去etcd查询/grpc/local-server这个key的值 try { response = kvClient.get(ByteSequence.from(GRPC_SERVER_INFO_KEY, UTF_8)).get(); } catch (Exception exception) { log.error("get grpc key from etcd error", exception); } if (null==response || response.getKvs().isEmpty()) { log.error("empty value of key [{}]", GRPC_SERVER_INFO_KEY); return null; } // 从response中取得值 String rawAddrInfo = response.getKvs().get(0).getValue().toString(UTF_8); // rawAddrInfo是“192.169.0.1:8080”这样的字符串,即一个IP和一个端口,用":"分割, // 这里用":"分割成数组返回 return null==rawAddrInfo ? null : rawAddrInfo.split(":"); } /** * 每次注册bean都会执行的方法, * 该方法从etcd取得gRPC服务端地址, * 用于实例化成员变量SimpleBlockingStub */ @PostConstruct public void simpleBlockingStub() { // 从etcd获取地址信息 String[] array = getGrpcServerInfo(); log.info("create stub bean, array info from etcd {}", Arrays.toString(array)); // 数组的第一个元素是gRPC服务端的IP地址,第二个元素是端口 if (null==array || array.length<2) { log.error("can not get valid grpc address from etcd"); return; } // 数组的第一个元素是gRPC服务端的IP地址 String addr = array[0]; // 数组的第二个元素是端口 int port = Integer.parseInt(array[1]); // 根据刚才获取的gRPC服务端的地址和端口,创建channel Channel channel = ManagedChannelBuilder .forAddress(addr, port) .usePlaintext() .build(); // 根据channel创建stub simpleBlockingStub = SimpleGrpc.newBlockingStub(channel); } }
package com.bolingcavalry.dynamicrpcaddr; import com.bolingcavalry.grpctutorials.lib.HelloReply; import com.bolingcavalry.grpctutorials.lib.HelloRequest; import com.bolingcavalry.grpctutorials.lib.SimpleGrpc; import io.grpc.StatusRuntimeException; import lombok.Setter; import net.devh.boot.grpc.client.inject.GrpcClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class GrpcClientService { @Autowired(required = false) @Setter private StubWrapper stubWrapper; public String sendMessage(final String name) { // 很有可能simpleStub对象为null if (null==stubWrapper) { return "invalid SimpleBlockingStub, please check etcd configuration"; } try { final HelloReply response = stubWrapper.getSimpleBlockingStub().sayHello(HelloRequest.newBuilder().setName(name).build()); return response.getMessage(); } catch (final StatusRuntimeException e) { return "FAILED with " + e.getStatus().getCode().name(); } } }
package com.bolingcavalry.dynamicrpcaddr; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class GrpcClientController { @Autowired private GrpcClientService grpcClientService; @RequestMapping("/") public String printMessage(@RequestParam(defaultValue = "will") String name) { return grpcClientService.sendMessage(name); } }
package com.bolingcavalry.dynamicrpcaddr; import com.bolingcavalry.grpctutorials.lib.SimpleGrpc; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.beans.factory.support.DefaultListableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class RefreshStubInstanceController implements ApplicationContextAware { private ApplicationContext applicationContext; @Autowired private GrpcClientService grpcClientService; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @RequestMapping("/refreshstub") public String refreshstub() { String beanName = "stubWrapper"; //获取BeanFactory DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory(); // 删除已有bean defaultListableBeanFactory.removeBeanDefinition(beanName); //创建bean信息. BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(StubWrapper.class); //动态注册bean. defaultListableBeanFactory.registerBeanDefinition(beanName, beanDefinitionBuilder.getBeanDefinition()); // 更新引用关系(注意,applicationContext.getBean方法很重要,会触发StubWrapper实例化操作) grpcClientService.setStubWrapper(applicationContext.getBean(StubWrapper.class)); return "Refresh success"; } }
部署gRPC服务端应用很简单,启动local-server应用即可:
version: '3' services: etcd1: image: "quay.io/coreos/etcd:v3.4.7" entrypoint: /usr/local/bin/etcd command: - '--name=etcd1' - '--data-dir=/etcd_data' - '--initial-advertise-peer-urls=http://etcd1:2380' - '--listen-peer-urls=http://0.0.0.0:2380' - '--listen-client-urls=http://0.0.0.0:2379' - '--advertise-client-urls=http://etcd1:2379' - '--initial-cluster-token=etcd-cluster' - '--heartbeat-interval=250' - '--election-timeout=1250' - '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380' - '--initial-cluster-state=new' ports: - 2379:2379 volumes: - ./store/etcd1/data:/etcd_data etcd2: image: "quay.io/coreos/etcd:v3.4.7" entrypoint: /usr/local/bin/etcd command: - '--name=etcd2' - '--data-dir=/etcd_data' - '--initial-advertise-peer-urls=http://etcd2:2380' - '--listen-peer-urls=http://0.0.0.0:2380' - '--listen-client-urls=http://0.0.0.0:2379' - '--advertise-client-urls=http://etcd2:2379' - '--initial-cluster-token=etcd-cluster' - '--heartbeat-interval=250' - '--election-timeout=1250' - '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380' - '--initial-cluster-state=new' ports: - 2380:2379 volumes: - ./store/etcd2/data:/etcd_data etcd3: image: "quay.io/coreos/etcd:v3.4.7" entrypoint: /usr/local/bin/etcd command: - '--name=etcd3' - '--data-dir=/etcd_data' - '--initial-advertise-peer-urls=http://etcd3:2380' - '--listen-peer-urls=http://0.0.0.0:2380' - '--listen-client-urls=http://0.0.0.0:2379' - '--advertise-client-urls=http://etcd3:2379' - '--initial-cluster-token=etcd-cluster' - '--heartbeat-interval=250' - '--election-timeout=1250' - '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380' - '--initial-cluster-state=new' ports: - 2381:2379 volumes: - ./store/etcd3/data:/etcd_data
docker exec 08_etcd2_1 /usr/local/bin/etcdctl put /grpc/local-server 192.168.50.5:9898
现在执行以下命令,将etcd中的服务端信息改为正确的:
docker exec 08_etcd2_1 /usr/local/bin/etcdctl put /grpc/local-server 192.168.50.5:9899
至此,在不修改配置不重启服务的情况下,客户端也可以适应服务端的变化了,当然了,本文只是提供基本的操作参考,实际上的微服务环境会更复杂,例如refreshstub接口可能被其他服务调用,这样服务端有了变化可以更加及时地被更新,还有客户端本身也肯能是gRPC服务提供方,那也要把自己注册到etcd上去,还有利用etcd的watch功能监控指定的服务端是否一直存活,以及同一个gRPC服务的多个实例如何做负载均衡,等等,这些都要根据您的实际情况来定制;
本篇内容过多,可见对于这些官方不支持的微服务环境,咱们自己去做注册发现的适配很费时费力的,如果设计和选型能自己做主,我们更倾向于使用现成的注册中心,接下来的文章,咱们就一起尝试使用eureka为gRPC提供注册发现服务;
微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
https://github.com/zq2599/blog_demos