重试机制: 在发生异常时,重新尝试请求,多次还是失败时,才会抛出异常。
应用场景: 可能由于网络抖动出现第一次调用失败,尝试几次就可以恢复正常。
比如Spring提供的声明式的重试类库Spring-Retry
。
Feign Core
也提供了自己的重试机制,基于Retryer
接口。
Retryer接口位于 feign-core
包中,声明了一个连续重试方法,及一个从不重试的实例对象。
public interface Retryer extends Cloneable { // 不重试Retryer 示例对象,直接抛出异常 Retryer NEVER_RETRY = new Retryer() { public void continueOrPropagate(RetryableException e) { throw e; } public Retryer clone() { return this; } }; // 连续重试 void continueOrPropagate(RetryableException var1); }
Default
是Retryer 接口的默认实现类,也就是Feign 的默认重试策略。
public static class Default implements Retryer { // 最大访问次数(包含了第一次和重试的次数) private final int maxAttempts; // 重试的间隔时间 private final long period; // 最大重试间隔 private final long maxPeriod; // 当前访问次数 int attempt; // 总的重试间隔 long sleptForMillis; // 默认配置(重试间隔100毫秒、最大重试间隔1S、最多访问5次) public Default() { this(100L, TimeUnit.SECONDS.toMillis(1L), 5); } public Default(long period, long maxPeriod, int maxAttempts) { this.period = period; this.maxPeriod = maxPeriod; this.maxAttempts = maxAttempts; this.attempt = 1; } protected long currentTimeMillis() { return System.currentTimeMillis(); } // 默认重试算法 public void continueOrPropagate(RetryableException e) { // 如果重试的次数attempt大于最大重试次数,直接抛出异常 // attempt的初始值为1 ,i++是先进行运算,所以就表示当前重试次数,比较结束了之后再+1, // 当第attempt 为5时,也就是第五次进入这里,条件为true,则会直接抛出异常,不再进行请求发送。 // 所以最终是,重试了四次,总共请求了5次。 if (this.attempt++ >= this.maxAttempts) { throw e; } else { long interval; // 响应数据是否包含了 Retry-After 头,这个头用来告诉用户代理需要等待多长时间之后才能继续发送请求 if (e.retryAfter() != null) { interval = e.retryAfter().getTime() - this.currentTimeMillis(); if (interval > this.maxPeriod) { interval = this.maxPeriod; } if (interval < 0L) { return; } } else { // 计算重试时间间隔 interval = this.nextMaxInterval(); } try { // 线程睡眠时间间隔 第一次为 150ms Thread.sleep(interval); } catch (InterruptedException var5) { Thread.currentThread().interrupt(); throw e; } // 记录时间间隔总数 this.sleptForMillis += interval; } } // 计算公式=》 配置的间隔时间(默认100ms)* (1.5 的(当前重试次数)次方) // 接着判断 当前时间时间,是否超出了最大限制,超过了则返回最大时间间隔。 long nextMaxInterval() { long interval = (long)((double)this.period * Math.pow(1.5D, (double)(this.attempt - 1))); return interval > this.maxPeriod ? this.maxPeriod : interval; } public Retryer clone() { return new Retryer.Default(this.period, this.maxPeriod, this.maxAttempts); } }
默认是不进行重试,所以需要配置。
@Bean Retryer feignRetryer() { return new Retryer.Default(); }
在订单服务配置一个超长的线程睡眠,我们手动触发一个读取超时,然后由这个超时异常,由最里层往上进行分析,因为之前分析过很多次执行流程源码了,这里只着重看下异常。
这个异常是有底层HTTP 框架抛出的,会被负载均衡客户端捕获到,这里catch 了Exception
,所以异常都会被捕获,捕获了之后,会转化为ClientException
。
ClientException
又会被负载均衡客户端的execute
方法捕获到。因为SocketTimeoutException
也是IOException
,直接会直接抛出IOException
。
IOException
又会被方法处理器捕获到,然后转为RetryableException
(可以重启的异常)。
在上述代码的最后,调用FeignException
的errorExecuting
,可以看到,创建了一个RetryableException
,
static FeignException errorExecuting(Request request, IOException cause) { return new RetryableException(-1, String.format("%s executing %s %s", cause.getMessage(), request.httpMethod(), request.url()), request.httpMethod(), cause, (Date)null, request); }
最终,RetryableException
还是会被方法处理器的invoke
方法所捕获到,进行重试,直到重试到最大次数时,还是失败,就会抛出异常,结束循环,执行失败。
可以看到方法执行器,每次执行的时候,都会创建一个重试器,而且方法执行的时候,是一个死循环,只有抛出异常时或者正常返回时,才会结束。
public Object invoke(Object[] argv) throws Throwable { RequestTemplate template = this.buildTemplateFromArgs.create(argv); Options options = this.findOptions(argv); // 每次请求都会创建一个重试处理器 Retryer retryer = this.retryer.clone(); while(true) { try { return this.executeAndDecode(template, options); } catch (RetryableException var9) { RetryableException e = var9; // 捕获到RetryableException ,则会调用重试处理器,主要是进行执行时间计算,没有抛出异常,则继续循环 try { retryer.continueOrPropagate(e); } catch (RetryableException var8) { Throwable cause = var8.getCause(); if (this.propagationPolicy == ExceptionPropagationPolicy.UNWRAP && cause != null) { throw cause; } throw var8; } if (this.logLevel != Level.NONE) { this.logger.logRetry(this.metadata.configKey(), this.logLevel); } } } }
默认的重试算法,会有一个间隔时间,线程会休眠,然后再重新执行请求(在Default 源码中已经分析过了)。
计算公式=》 配置的间隔时间(默认100ms)* (1.5 的(当前重试次数)次方)。
long interval = (long)((double)this.period * Math.pow(1.5D, (double)(this.attempt - 1))); return interval > this.maxPeriod ? this.maxPeriod : interval;
这里可以举个例子,默认最大重试次数为5次,最大间隔时间为1秒,第一次休眠时间为100ms,由计算公式我们可以推倒出来如下结果:
1. 第一次重试,100*(1.5的1次方)间隔150 毫秒后再次请求 2. 第二次重试,100*(1.5的2次方)间隔225 毫秒后再次请求 3. 第三次重试,100*(1.5的3次方)间隔337 毫秒后再次请求 4. 第四次重试,100*(1.5的4次方)间隔506 毫秒后再次请求, 5. 第五次,这个时候,访问次数attempt 达到了最大值5次,不会再重试了,而是直接抛出异常,结束请求
通过以上源码分析,可知,只有IO 异常时,才会解析为可重试异常,进行重试操作。
和Feign 一样,Ribbon 也是有重试机制,接下来按照上面的套路分析下Ribbon 。
RxJava - JVM
响应式扩展Reactive Extensions 用于使用Java VM的可观察序列编写异步和基于事件的程序的库。
RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。RxJS 中含有两个基本概念:Observables
与 Observe
r。Observables
作为被观察者,是一个值或事件的流集合;而Observer
则作为观察者,根据 Observables
进行处理。
Observables 与 Observer 之间的订阅发布关系(观察者模式) 如下:
订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。
发布:Observable 通过回调 next 方法向 Observer 发布事件。
RetryHandler
接口是 Ribbon 的重试处理器,用来处理重试逻辑。
public interface RetryHandler { RetryHandler DEFAULT = new DefaultLoadBalancerRetryHandler(); // 判断当前异常是否应该重试 boolean isRetriableException(Throwable var1, boolean var2); // 是否是熔断类型异常; boolean isCircuitTrippingException(Throwable var1); // 获取当前节点最大重试次数 int getMaxRetriesOnSameServer(); // 获取调用不同节点的最大重试次数 int getMaxRetriesOnNextServer(); }
DefaultLoadBalancerRetryHandler
是RetryHandler
接口的默认实现类,重点是看它的属性和构造函数:
// 定义了可以重试的异常 private List<Class<? extends Throwable>> retriable = Lists.newArrayList(new Class[]{ConnectException.class, SocketTimeoutException.class}); // 定义了可以重试的异常 private List<Class<? extends Throwable>> circuitRelated = Lists.newArrayList(new Class[]{SocketException.class, SocketTimeoutException.class}); // 对应 MaxAutoRetries 配置 protected final int retrySameServer; // 对应 MaxAutoRetriesNextServer 配置 protected final int retryNextServer; // 是否开启重试 protected final boolean retryEnabled; // 没有参数时,表示不重试 public DefaultLoadBalancerRetryHandler() { this.retrySameServer = 0; this.retryNextServer = 0; this.retryEnabled = false; }
RequestSpecificRetryHandler
也实现了RetryHandler
接口,从名字上看,是具有请求特征的重试处理器,每次请求时,Ribbon 都会创建单独的一个RequestSpecificRetryHandler
(这是实际使用的处理器) ,也就是会和当前客户端的配置(IClientConfig对象)绑定,实现不同请求,不同配置策略。
可以看到,RequestSpecificRetryHandler
代理了一个RetryHandler
,默认是DefaultLoadBalancerRetryHandler
,看他们的参数,可以知道,默认Ribbon 重试机制也是关闭的。
FeignLoadBalancer
的executeWithLoadBalancer
方法中调用buildLoadBalancerCommand
方法构造LoadBalancerCommand
对象。这个类是将Ribbon
将请求转为RxJava API调用的实现。
该类的selectServer
方法,会在注册中心中,根据负载均衡算法获取到一个健康的可用服务,然后返回一个Observable
对象,这是一个观察者对象,创建的时候传入了一个Subscriber
订阅者对象,当Observable
对象被订阅时,Subscriber
中的call 方法会执行。
// 定义一个事件源 // Create操作符是用来创建一个Observable private Observable<Server> selectServer() { return Observable.create(new OnSubscribe<Server>() { // Observable 包含了一个OnSubscribe 对象 // 当Observable被订阅(subscribe)时,OnSubscribe接口的call方法会被执行 @Override public void call(Subscriber<? super Server> next) { try { // 事件源被订阅时,执行call // 负载均衡查询可用服务 Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey); // next.onNext(server); next.onCompleted(); } catch (Exception e) { next.onError(e); } } }); }
在submit
方法中,会创建一个Observable
对象,用来观察请求执行状态,如果失败,则会重试,达到最大次数,抛出异常,结束重试。
public Observable<T> submit(final ServerOperation<T> operation) { // 省略..... // 获取重试器中 最大重试次数 final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer(); // (server == null ? selectServer() : Observable.just(server)) // 获取可用服务的Observable 对象 Observable<T> o = (server == null ? selectServer() : Observable.just(server)) // concatMap发射的数据集是有序的 .concatMap(new Func1<Server, Observable<T>>() { @Override // 为选中的每台服务器执行调用 public Observable<T> call(Server server) { context.setServer(server); // 获取当前服务的监控记录 final ServerStats stats = loadBalancerContext.getServerStats(server); // 为每次尝试和重试调用 Observable<T> o = Observable .just(server) .concatMap(new Func1<Server, Observable<T>>() { @Override public Observable<T> call(final Server server) { // 重试次数计数 context.incAttemptCount(); loadBalancerContext.noteOpenConnection(stats); if (listenerInvoker != null) { try { listenerInvoker.onStartWithServer(context.toExecutionInfo()); } catch (AbortExecutionException e) { return Observable.error(e); } } final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start(); // return operation.call(server).doOnEach(new Observer<T>() { private T entity; @Override public void onCompleted() { recordStats(tracer, stats, entity, null); // TODO: What to do if onNext or one rror are never called? } @Override public void one rror(Throwable e) { recordStats(tracer, stats, null, e); logger.debug("Got error {} when executed on server {}", e, server); if (listenerInvoker != null) { listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo()); } } @Override public void onNext(T entity) { this.entity = entity; if (listenerInvoker != null) { listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo()); } } private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) { tracer.stop(); loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler); } }); } }); // 针对同一实例的重试回调 if (maxRetrysSame > 0) o = o.retry(retryPolicy(maxRetrysSame, true)); return o; } }); // 重试下一个实例的回调 if (maxRetrysNext > 0 && server == null) o = o.retry(retryPolicy(maxRetrysNext, false)); // 重试超过次数则终止调用并设置对应异常的回调 return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() { // 封装异常信息并返回 @Override public Observable<T> call(Throwable e) { // 省略...... return Observable.error(e); } }); }
以下为全局配置,也可以添加名称前缀,指定客户端。
# Ribbon 配置 ribbon: MaxAutoRetries: 2 MaxAutoRetriesNextServer: 3 OkToRetryOnAllOperations: false
各参数说明如下:
重试总次数计算公式:
MaxAutoRetries+(MaxAutoRetries+1)*(MaxAutoRetriesNextServer)
第一次请求时异常,重试2次,查询下一个节点,发送一次请求失败,进入重试,再重试2次失败,继续查询三次节点重试,所以以上配置会重试(2+3*3=11)次
OkToRetryOnAllOperations
,不然很容易出现接口幂等性问题,而且下游服务的GET请求,是要求只做查询功能