本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent
在前面一节,我们梳理了实现 Feign 断路器以及线程隔离的思路,这一节,我们先不看如何源码实现(因为源码中会包含负载均衡算法的改进部分),先来讨论下如何优化目前的负载均衡算法。
其中请求包含 traceId 是来自于我们使用了 spring-cloud-sleuth 链路追踪,基于这种机制我们能保证请求不会重试到之前已经调用过的实例。源码是:
//一定必须是实现ReactorServiceInstanceLoadBalancer //而不是ReactorLoadBalancer<ServiceInstance> //因为注册的时候是ReactorServiceInstanceLoadBalancer @Log4j2 public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer { private final ServiceInstanceListSupplier serviceInstanceListSupplier; //每次请求算上重试不会超过1分钟 //对于超过1分钟的,这种请求肯定比较重,不应该重试 private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES) //随机初始值,防止每次都是从第一个开始调用 .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000))); private final String serviceId; private final Tracer tracer; public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) { this.serviceInstanceListSupplier = serviceInstanceListSupplier; this.serviceId = serviceId; this.tracer = tracer; } //每次重试,其实都会调用这个 choose 方法重新获取一个实例 @Override public Mono<Response<ServiceInstance>> choose(Request request) { return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances)); } private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } return getInstanceResponseByRoundRobin(serviceInstances); } private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } //为了解决原始算法不同调用并发可能导致一个请求重试相同的实例 //从 sleuth 的 Tracer 中获取当前请求的上下文 Span currentSpan = tracer.currentSpan(); //如果上下文不存在,则可能不是前端用户请求,而是其他某些机制触发,我们就创建一个新的上下文 if (currentSpan == null) { currentSpan = tracer.newTrace(); } //从请求上下文中获取请求的 traceId,用来唯一标识一个请求 long l = currentSpan.context().traceId(); AtomicInteger seed = positionCache.get(l); int s = seed.getAndIncrement(); int pos = s % serviceInstances.size(); log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size()); return new DefaultResponse(serviceInstances.stream() //实例返回列表顺序可能不同,为了保持一致,先排序再取 .sorted(Comparator.comparing(ServiceInstance::getInstanceId)) .collect(Collectors.toList()).get(pos)); } }
但是在这次请求突增很多的时候,这种负载均衡算法还是给我们带来了问题。
首先,本次突增,我们并没有采取扩容,导致本次的性能压力对于压力的均衡分布非常敏感。举个例子是,假设微服务 A 有 9 个实例,在业务高峰点来的时候,最理想的情况是保证无论何时这 9 个负载压力都完全均衡,但是由于我们使用了初始值为随机数的原子变量 position,虽然从一天的总量上来看,负责均衡压力肯定是均衡,但是在某一小段时间内,很可能压力全都跑到了某几个实例上,导致这几个实例被压垮,熔断,然后又都跑到了另外的几个实例上,又被压垮,熔断,如此恶性循环。
然后,我们部署采用的是 k8s 部署,同一个虚拟机上面可能会跑很多微服务的 pod。在某些情况下,同一个微服务的多个 pod 可能会跑到同一个虚拟机 Node 上,这个可以从pod 的 ip 网段上看出来:例如某个微服务有如下 7 个实例:10.238.13.12:8181,10.238.13.24:8181,10.238.15.12:8181,10.238.17.12:8181,10.238.20.220:8181,10.238.21.31:8181,10.238.21.121:8181,那么 10.238.13.12:8181 与 10.238.13.24:8181 很可能在同一个 Node 上,10.238.21.31:8181 和 10.238.21.121:8181 很可能在同一个 Node 上。我们重试,需要优先重试与之前重试过的实例尽量不在同一个 Node 上的实例,因为同一个 Node 上的实例只要有一个有问题或者压力过大,其他的基本上也有问题或者压力过大。
最后,如果调用某个实例一直失败,那么这个实例的调用优先级需要排在其他正常的实例后面。这个对于减少快速刷新发布(一下子启动很多实例之后停掉多个老实例,实例个数大于重试次数配置)对于用户的影响,以及某个可用区突然发生异常导致多个实例下线对用户的影响,以及业务压力已经过去,压力变小后,需要关掉不再需要的实例,导致大量实例发生迁移的时候对用户的影响,有很大的作用。
我们针对上面三个问题,提出了一种优化后的解决方案:
具体实现是:以下的代码来自于:https://github.com/JoJoTec/spring-cloud-parent
我们使用了依赖:
<dependency> <groupId>io.dropwizard.metrics</groupId> <artifactId>metrics-core</artifactId> </dependency>
记录实例数据的缓存类:
@Log4j2 public class ServiceInstanceMetrics { private static final String CALLING = "-Calling"; private static final String FAILED = "-Failed"; private MetricRegistry metricRegistry; ServiceInstanceMetrics() { } public ServiceInstanceMetrics(MetricRegistry metricRegistry) { this.metricRegistry = metricRegistry; } /** * 记录调用实例 * @param serviceInstance */ public void recordServiceInstanceCall(ServiceInstance serviceInstance) { String key = serviceInstance.getHost() + ":" + serviceInstance.getPort(); metricRegistry.counter(key + CALLING).inc(); } /** * 记录调用实例结束 * @param serviceInstance * @param isSuccess 是否成功 */ public void recordServiceInstanceCalled(ServiceInstance serviceInstance, boolean isSuccess) { String key = serviceInstance.getHost() + ":" + serviceInstance.getPort(); metricRegistry.counter(key + CALLING).dec(); if (!isSuccess) { //不成功则记录失败 metricRegistry.meter(key + FAILED).mark(); } } /** * 获取正在运行的调用次数 * @param serviceInstance * @return */ public long getCalling(ServiceInstance serviceInstance) { String key = serviceInstance.getHost() + ":" + serviceInstance.getPort(); long count = metricRegistry.counter(key + CALLING).getCount(); log.debug("ServiceInstanceMetrics-getCalling: {} -> {}", key, count); return count; } /** * 获取最近一分钟调用失败次数分钟速率,其实是滑动平均数 * @param serviceInstance * @return */ public double getFailedInRecentOneMin(ServiceInstance serviceInstance) { String key = serviceInstance.getHost() + ":" + serviceInstance.getPort(); double rate = metricRegistry.meter(key + FAILED).getOneMinuteRate(); log.debug("ServiceInstanceMetrics-getFailedInRecentOneMin: {} -> {}", key, rate); return rate; } }
负载均衡核心代码:
private final LoadingCache<Long, Set<String>> calledIpPrefixes = Caffeine.newBuilder() .expireAfterAccess(3, TimeUnit.MINUTES) .build(k -> Sets.newConcurrentHashSet()); private final String serviceId; private final Tracer tracer; private final ServiceInstanceMetrics serviceInstanceMetrics; //每次重试,其实都会调用这个 choose 方法重新获取一个实例 @Override public Mono<Response<ServiceInstance>> choose(Request request) { Span span = tracer.currentSpan(); return serviceInstanceListSupplier.get().next() .map(serviceInstances -> { //保持 span 和调用 choose 的 span 一样 try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) { return getInstanceResponse(serviceInstances); } }); } private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } //读取 spring-cloud-sleuth 的对于当前请求的链路追踪上下文,获取对应的 traceId Span currentSpan = tracer.currentSpan(); if (currentSpan == null) { currentSpan = tracer.newTrace(); } long l = currentSpan.context().traceId(); return getInstanceResponseByRoundRobin(l, serviceInstances); } @VisibleForTesting public Response<ServiceInstance> getInstanceResponseByRoundRobin(long traceId, List<ServiceInstance> serviceInstances) { //首先随机打乱列表中实例的顺序 Collections.shuffle(serviceInstances); //需要先将所有参数缓存起来,否则 comparator 会调用多次,并且可能在排序过程中参数发生改变(针对实例的请求统计数据一直在并发改变) Map<ServiceInstance, Integer> used = Maps.newHashMap(); Map<ServiceInstance, Long> callings = Maps.newHashMap(); Map<ServiceInstance, Double> failedInRecentOneMin = Maps.newHashMap(); serviceInstances = serviceInstances.stream().sorted( Comparator //之前已经调用过的网段,这里排后面 .<ServiceInstance>comparingInt(serviceInstance -> { return used.computeIfAbsent(serviceInstance, k -> { return calledIpPrefixes.get(traceId).stream().anyMatch(prefix -> { return serviceInstance.getHost().contains(prefix); }) ? 1 : 0; }); }) //当前错误率最少的 .thenComparingDouble(serviceInstance -> { return failedInRecentOneMin.computeIfAbsent(serviceInstance, k -> { double value = serviceInstanceMetrics.getFailedInRecentOneMin(serviceInstance); //由于使用的是移动平均值(EMA),需要忽略过小的差异(保留两位小数,不是四舍五入,而是直接舍弃) return ((int) (value * 100)) / 100.0; }); }) //当前负载请求最少的 .thenComparingLong(serviceInstance -> { return callings.computeIfAbsent(serviceInstance, k -> serviceInstanceMetrics.getCalling(serviceInstance) ); }) ).collect(Collectors.toList()); if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } ServiceInstance serviceInstance = serviceInstances.get(0); //记录本次返回的网段 calledIpPrefixes.get(traceId).add(serviceInstance.getHost().substring(0, serviceInstance.getHost().lastIndexOf("."))); //目前记录这个只为了兼容之前的单元测试(调用次数测试) positionCache.get(traceId).getAndIncrement(); return new DefaultResponse(serviceInstance); }
对于记录实例数据的缓存何时更新,是在 FeignClient 粘合重试,断路以及线程隔离的代码中的,这个我们下一节就会看到。
1. 为何没有使用所有微服务共享的缓存来保存调用数据,来让这些数据更加准确?
共享缓存的可选方案包括将这些数据记录放入 Redis,或者是 Apache Ignite 这样的内存网格中。但是有两个问题:
每个微服务使用本地缓存,记录自己调用其他实例的数据,在我们这里看来,不仅是更容易实现,也是更准确的做法。
2. 采用 EMA 的方式而不是请求窗口的方式统计最近错误率
采用请求窗口的方式统计,肯定是最准确的,例如我们统计最近一分钟的错误率,就将最近一分钟的请求缓存起来,读取的时候,将缓存起来的请求数据加在一起取平均数即可。但是这种方式在请求突增的时候,可能会占用很多很多内存来缓存这些请求。同时计算错误率的时候,随着缓存请求数的增多也会消耗更大量的 CPU 进行计算。这样做很不值得。
EMA 这种滑动平均值的计算方式,常见于各种性能监控统计场景,例如 JVM 中 TLAB 大小的动态计算,G1 GC Region 大小的伸缩以及其他很多 JVM 需要动态得出合适值的地方,都用这种计算方式。他不用将请求缓存起来,而是直接用最新值乘以一个比例之后加上老值乘以 (1 - 这个比例),这个比例一般高于 0.5,表示 EMA 和当前最新值更加相关。
但是 EMA 也带来另一个问题,我们会发现随着程序运行小数点位数会非常多,会看到类似于如下的值:0.00000000123, 0.120000001, 0.120000003, 为了忽略过于细致差异的影响(其实这些影响也来自于很久之前的错误请求),我们只保留两位小数进行排序。
微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer: