上一篇博客中介绍了IdleStateHandler的使用场景及源码分析,我们可以使用IdleStateHandler来进行心跳检测。
除了这个,还有两个Handler与该IdleStateHandler功能类似,是作为其的补充。本文就来介绍下。
/** * Raises a {@link ReadTimeoutException} when no data was read within a certain * period of time. */ public class ReadTimeoutHandler extends IdleStateHandler { }
看注释就很明确了,如果在指定的时间内没有数据被读取,则抛出一个ReadTimeoutException。
我们来直接分析下源码
public class ReadTimeoutHandler extends IdleStateHandler { private boolean closed; // 默认以秒为单位 public ReadTimeoutHandler(int timeoutSeconds) { this(timeoutSeconds, TimeUnit.SECONDS); } public ReadTimeoutHandler(long timeout, TimeUnit unit) { super(timeout, 0, 0, unit); } // 这个重写了IdleStateHandler.channelIdle()方法 @Override protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { assert evt.state() == IdleState.READER_IDLE; readTimedOut(ctx); } protected void readTimedOut(ChannelHandlerContext ctx) throws Exception { // 如果当前Handler没有关闭,则直接抛出一个ReadTimeoutException并关闭当前channel if (!closed) { ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE); ctx.close(); closed = true; } } }
代码比较少,重要的就是重写了IdleStateHandler.channelIdle()方法。
我们可以来看下IdleStateHandler.channelIdle()方法
public class IdleStateHandler extends ChannelDuplexHandler { protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { ctx.fireUserEventTriggered(evt); } }
原来的处理中直接将READ_IDLE的事件传递下到下一个ChannelHandler,下一个ChannelHandler捕获到该事件后做相应的处理。
而ReadTimeoutHandler处理更直接,不需要下一个ChannelHandler处理了,直接抛出异常,关闭channel。
// Raises a {@link WriteTimeoutException} when a write operation cannot finish in a certain period of time. public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter { }
从注释中我们了解到WriteTimeoutHandler与ReadTimeoutHandler比较类似,如果在指定时间内写操作没有完成的话,则直接抛出WriteTimeoutException异常。
这个与IdleStateHandler中的write事件监测有所不同(IdleStateHandler监测的是在指定时间内没有发生写事件,则发送WRITE_IDLE事件),需要注意
public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter { private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1); // 指定超时时间 private final long timeoutNanos; // 一个双向链表的task,后续会分析 private WriteTimeoutTask lastTask; private boolean closed; public WriteTimeoutHandler(int timeoutSeconds) { this(timeoutSeconds, TimeUnit.SECONDS); } // 对写超时时间进行设置 public WriteTimeoutHandler(long timeout, TimeUnit unit) { ObjectUtil.checkNotNull(unit, "unit"); if (timeout <= 0) { timeoutNanos = 0; } else { timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS); } } }
public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (timeoutNanos > 0) { promise = promise.unvoid(); // 创建一个定时任务,具体见下面 scheduleTimeout(ctx, promise); } ctx.write(msg, promise); } private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise promise) { // 创建一个WriteTimeoutTask,具体见2.3 final WriteTimeoutTask task = new WriteTimeoutTask(ctx, promise); // 在延迟timeoutNanos后执行task task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS); // 如果task没有执行结束,则将当前task添加到lastTask的后一个节点,并添加监听 if (!task.scheduledFuture.isDone()) { addWriteTimeoutTask(task); promise.addListener(task); } } }
private final class WriteTimeoutTask implements Runnable, ChannelFutureListener { private final ChannelHandlerContext ctx; private final ChannelPromise promise; // WriteTimeoutTask is also a node of a doubly-linked list WriteTimeoutTask prev; WriteTimeoutTask next; ScheduledFuture<?> scheduledFuture; WriteTimeoutTask(ChannelHandlerContext ctx, ChannelPromise promise) { this.ctx = ctx; this.promise = promise; } @Override public void run() { // 当前任务执行时机就是在经过timeoutNanos延时后执行的,如果这时write任务还没有完成,说明已经超时了 if (!promise.isDone()) { try { // 超时则抛出异常,具体见2.3.1 writeTimedOut(ctx); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } removeWriteTimeoutTask(this); } // 用于监听write事件的完成,完成后,直接取消scheduledFuture,并将当前task从链表中删除 @Override public void operationComplete(ChannelFuture future) throws Exception { // scheduledFuture has already be set when reaching here scheduledFuture.cancel(false); removeWriteTimeoutTask(this); } }
2.3.1 WriteTimeoutHandler.writeTimedOut()超时触发异常
public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter { protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception { if (!closed) { // 直接抛出异常,并关闭连接 ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE); ctx.close(); closed = true; } } }
重要的处理都放在WriteTimeoutTask中,重点还是理解这个定时任务的执行时机,该WriteTimeoutTask 是延时timeoutNanos后执行的,所以,按照该Handler的含义来说,在timeoutNanos后,当前write操作应对是done的,如果没有结束,则直接抛出异常即可。