在我们系统发布生产环境时,有时为了确保新的服务逻辑没有问题,会让一小部分特定的用户来使用新的版本(比如客户端的内测版本
),而其余的用户使用旧的版本,那么这个在Spring Cloud中该如何来实现呢?
负载均衡组件使用:Spring Cloud LoadBalancer
通过翻阅Spring Cloud的官方文档,我们知道,大概可以通过2
种方式来达到我们的目的。
ReactiveLoadBalancer
接口,重写负载均衡算法。ServiceInstanceListSupplier
接口,重写get
方法,返回自定义的服务列表
。ServiceInstanceListSupplier
: 可以实现如下功能,比如我们的 user-service
在注册中心上存在5个,此处我可以只返回3个。
查阅Spring Cloud官方文档,发现org.springframework.cloud.loadbalancer.core.HintBasedServiceInstanceListSupplier
类可以实现类似的功能。
那可能有人会说,既然Spring Cloud
已经提供了这个功能,为什么你还要重写一个
? 此处只是为了一个记录,因为工作中的需求可能各种各样,万一后期有类似的需求,此处记录了,后期知道怎么实现。
package com.huan.loadbalancer; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.Request; import org.springframework.cloud.client.loadbalancer.RequestDataContext; import org.springframework.cloud.loadbalancer.core.DelegatingServiceInstanceListSupplier; import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier; import org.springframework.http.HttpHeaders; import reactor.core.publisher.Flux; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; /** * 自定义 根据服务名 获取服务实例 列表 * <p> * 需求: 用户通过请求访问 网关<br /> * 1、如果请求头中的 version 值和 下游服务元数据的 version 值一致,则选择该 服务。<br /> * 2、如果请求头中的 version 值和 下游服务元数据的 version 值不一致,且 不存在 version 的值 为 default 则直接报错。<br /> * 3、如果请求头中的 version 值和 下游服务元数据的 version 值不一致,且 存在 version 的值 为 default,则选择该服务。<br /> * <p> * 参考: {@link org.springframework.cloud.loadbalancer.core.HintBasedServiceInstanceListSupplier} 实现 * * @author huan.fu * @date 2023/6/19 - 21:14 */ @Slf4j public class VersionServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier { /** * 请求头的名字, 通过这个 version 字段和 服务中的元数据来version字段进行比较, * 得到最终的实例数据 */ private static final String VERSION_HEADER_NAME = "version"; public VersionServiceInstanceListSupplier(ServiceInstanceListSupplier delegate) { super(delegate); } @Override public Flux<List<ServiceInstance>> get() { return delegate.get(); } @Override public Flux<List<ServiceInstance>> get(Request request) { return delegate.get(request).map(instances -> filteredByVersion(instances, getVersion(request.getContext()))); } private String getVersion(Object requestContext) { if (requestContext == null) { return null; } String version = null; if (requestContext instanceof RequestDataContext) { version = getVersionFromHeader((RequestDataContext) requestContext); } log.info("获取到需要请求服务[{}]的version:[{}]", getServiceId(), version); return version; } /** * 从请求中获取version */ private String getVersionFromHeader(RequestDataContext context) { if (context.getClientRequest() != null) { HttpHeaders headers = context.getClientRequest().getHeaders(); if (headers != null) { return headers.getFirst(VERSION_HEADER_NAME); } } return null; } private List<ServiceInstance> filteredByVersion(List<ServiceInstance> instances, String version) { // 1、获取 请求头中的 version 和 ServiceInstance 中 元数据中 version 一致的服务 List<ServiceInstance> selectServiceInstances = instances.stream() .filter(instance -> instance.getMetadata().get(VERSION_HEADER_NAME) != null && Objects.equals(version, instance.getMetadata().get(VERSION_HEADER_NAME))) .collect(Collectors.toList()); if (!selectServiceInstances.isEmpty()) { log.info("返回请求服务:[{}]为version:[{}]的有:[{}]个", getServiceId(), version, selectServiceInstances.size()); return selectServiceInstances; } // 2、返回 version=default 的实例 selectServiceInstances = instances.stream() .filter(instance -> Objects.equals(instance.getMetadata().get(VERSION_HEADER_NAME), "default")) .collect(Collectors.toList()); log.info("返回请求服务:[{}]为version:[{}]的有:[{}]个", getServiceId(), "default", selectServiceInstances.size()); return selectServiceInstances; } }
package com.huan.loadbalancer; import feign.RequestInterceptor; import feign.RequestTemplate; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; /** * 将version请求头通过feign传递到下游 * * @author huan.fu * @date 2023/6/20 - 08:27 */ @Component @Slf4j public class VersionRequestInterceptor implements RequestInterceptor { @Override public void apply(RequestTemplate requestTemplate) { String version = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest() .getHeader("version"); log.info("feign 中传递的 version 请求头的值为:[{}]", version); requestTemplate .header("version", version); } }
注意:
此处全局配置了,配置了一个feign的全局拦截器,进行请求头version
的传递。
package com.huan.loadbalancer; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient; import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients; import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 此处选择全局配置 * * @author huan.fu * @date 2023/6/19 - 22:16 */ @Configuration @Slf4j @LoadBalancerClients(defaultConfiguration = VersionServiceInstanceListSupplierConfiguration.class) public class VersionServiceInstanceListSupplierConfiguration { @Bean @ConditionalOnClass(name = "org.springframework.web.servlet.DispatcherServlet") public VersionServiceInstanceListSupplier versionServiceInstanceListSupplierV1( ConfigurableApplicationContext context) { log.error("===========> versionServiceInstanceListSupplierV1"); ServiceInstanceListSupplier delegate = ServiceInstanceListSupplier.builder() .withBlockingDiscoveryClient() .withCaching() .build(context); return new VersionServiceInstanceListSupplier(delegate); } @Bean @ConditionalOnClass(name = "org.springframework.web.reactive.DispatcherHandler") public VersionServiceInstanceListSupplier versionServiceInstanceListSupplierV2( ConfigurableApplicationContext context) { log.error("===========> versionServiceInstanceListSupplierV2"); ServiceInstanceListSupplier delegate = ServiceInstanceListSupplier.builder() .withDiscoveryClient() .withCaching() .build(context); return new VersionServiceInstanceListSupplier(delegate); } }
此处偷懒全局配置了@Configuration @Slf4j @LoadBalancerClients(defaultConfiguration = VersionServiceInstanceListSupplierConfiguration.class)
spring: application: name: lobalancer-gateway-8001 cloud: nacos: discovery: # 配置 nacos 的服务地址 server-addr: localhost:8848 group: DEFAULT_GROUP config: server-addr: localhost:8848 gateway: discovery: locator: enabled: true server: port: 8001 logging: level: root: info
package com.huan.loadbalancer.controller; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * 提供者控制器 * * @author huan.fu * @date 2023/3/6 - 21:58 */ @RestController public class ProviderController { @Resource private NacosDiscoveryProperties nacosDiscoveryProperties; /** * 获取服务信息 * * @return ip:port */ @GetMapping("serverInfo") public String serverInfo() { return nacosDiscoveryProperties.getIp() + ":" + nacosDiscoveryProperties.getPort(); } }
spring: application: name: provider cloud: nacos: discovery: # 配置 nacos 的服务地址 server-addr: localhost:8848 # 配置元数据 metadata: version: v1 config: server-addr: localhost:8848 server: port: 8005
注意 metadata中version的值
spring: application: name: provider cloud: nacos: discovery: # 配置 nacos 的服务地址 server-addr: localhost:8848 # 配置元数据 metadata: version: v1 config: server-addr: localhost:8848 server: port: 8006
注意 metadata中version的值
spring: application: name: provider cloud: nacos: discovery: # 配置 nacos 的服务地址 server-addr: localhost:8848 # 配置元数据 metadata: version: default config: server-addr: localhost:8848 server: port: 8007
注意 metadata中version的值
/** * @author huan.fu * @date 2023/6/19 - 22:21 */ @FeignClient(value = "provider") public interface FeignProvider { /** * 获取服务信息 * * @return ip:port */ @GetMapping("serverInfo") String fetchServerInfo(); }
package com.huan.loadbalancer.controller; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.huan.loadbalancer.feign.FeignProvider; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; /** * 消费者控制器 * * @author huan.fu * @date 2023/6/19 - 22:21 */ @RestController public class ConsumerController { @Resource private FeignProvider feignProvider; @Resource private NacosDiscoveryProperties nacosDiscoveryProperties; @GetMapping("fetchProviderServerInfo") public Map<String, String> fetchProviderServerInfo() { Map<String, String> ret = new HashMap<>(4); ret.put("consumer信息", nacosDiscoveryProperties.getIp() + ":" + nacosDiscoveryProperties.getPort()); ret.put("provider信息", feignProvider.fetchServerInfo()); return ret; } }
spring: application: name: consumer cloud: nacos: discovery: # 配置 nacos 的服务地址 server-addr: localhost:8848 register-enabled: true service: nacos-feign-consumer group: DEFAULT_GROUP metadata: version: v1 config: server-addr: localhost:8848 server: port: 8002
注意 metadata中version的值
spring: application: name: consumer cloud: nacos: discovery: # 配置 nacos 的服务地址 server-addr: localhost:8848 register-enabled: true service: nacos-feign-consumer group: DEFAULT_GROUP metadata: version: v2 config: server-addr: localhost:8848 server: port: 8003
注意 metadata中version的值
spring: application: name: consumer cloud: nacos: discovery: # 配置 nacos 的服务地址 server-addr: localhost:8848 register-enabled: true service: nacos-feign-consumer group: DEFAULT_GROUP metadata: version: default config: server-addr: localhost:8848 server: port: 8003
注意 metadata中version的值
从上图中可以看到,当version=v1
时,服务消费者为consumer-8002
, 提供者为provider-8005
和provider-8006
➜ ~ curl --location --request GET 'http://localhost:8001/nacos-feign-consumer/fetchProviderServerInfo' \ --header 'version: v1' {"consumer信息":"192.168.8.168:8002","provider信息":"192.168.8.168:8005"}% ➜ ~ curl --location --request GET 'http://localhost:8001/nacos-feign-consumer/fetchProviderServerInfo' \ --header 'version: v1' {"consumer信息":"192.168.8.168:8002","provider信息":"192.168.8.168:8006"}% ➜ ~ curl --location --request GET 'http://localhost:8001/nacos-feign-consumer/fetchProviderServerInfo' \ --header 'version: v1' {"consumer信息":"192.168.8.168:8002","provider信息":"192.168.8.168:8005"}% ➜ ~ curl --location --request GET 'http://localhost:8001/nacos-feign-consumer/fetchProviderServerInfo' \ --header 'version: v1' {"consumer信息":"192.168.8.168:8002","provider信息":"192.168.8.168:8006"}% ➜ ~
可以看到,消费者返回的端口是8002
,提供者返回的端口是8005|8006
是符合预期的。
从上图中可以看到,当不携带时,服务消费者为consumer-8004
, 提供者为provider-8007
和
➜ ~ curl --location --request GET 'http://localhost:8001/nacos-feign-consumer/fetchProviderServerInfo' {"consumer信息":"192.168.8.168:8004","provider信息":"192.168.8.168:8007"}% ➜ ~ curl --location --request GET 'http://localhost:8001/nacos-feign-consumer/fetchProviderServerInfo' {"consumer信息":"192.168.8.168:8004","provider信息":"192.168.8.168:8007"}% ➜ ~ curl --location --request GET 'http://localhost:8001/nacos-feign-consumer/fetchProviderServerInfo' {"consumer信息":"192.168.8.168:8004","provider信息":"192.168.8.168:8007"}% ➜ ~
可以看到,消费者返回的端口是8004
,提供者返回的端口是8007
是符合预期的。
https://gitee.com/huan1993/spring-cloud-alibaba-parent/tree/master/loadbalancer-supply-service-instance
1、https://docs.spring.io/spring-cloud-commons/docs/current/reference/html/#spring-cloud-loadbalancer