2021SC@SDUSC
下面分析一下出站数据传播的细节。我们从ChannelOutboundHandlerAdapter的write方法开始分析:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); }
如果想要实现自己的业务处理逻辑,需要继承ChannelOutboundHandlerAdapter并且重写write方法。处理完数据之后,还需要继续向下传递数据,也就是需要调用ctx.write(msg, promise)方法,它会调用AbstractChannelHandlerContext的等效方法,下面是一个它调用链路上的方法:
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
这里最关键的是通过findContextOutbound方法来寻找下一个符合要求的handler:
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
这里从当前的handler,顺着prev往回找,找下一个outband为真的handler。看到这个方法可能会觉得很眼熟,因为和上一次博客中分析的findContextInbound(MASK_CHANNEL_READ)方法很相似,这个方法在入站事件传播时用于寻找下一个符合要求的ChannelHandler:
private AbstractChannelHandlerContext findContextInbound(int mask) { AbstractChannelHandlerContext ctx = this; EventExecutor currentExecutor = executor(); do { ctx = ctx.next; } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND)); return ctx; }
其实write事件原理和上次分析的read事件类似,区别就是方向和目标不同。入站事件的方向是next方向,目标是InboundHandler。出站事件的方向与入站事件相反,是prev方向,也就是往回传播,目标是OutboundHandler。所以从代码中我们可以看到,这个寻找是顺着prev的方向往回找,也就是出站事件的传播方向为从后往前。
到这里我们明白了ChannelPipeline的事件传播方向。现在来考虑对于入站数据,如果传递到了tail节点,到头了,会有怎样的处理?对于出站数据,如果传递到了head节点,会有怎样的处理?
来看一下 invokeWriteAndFlush方法,它有一个判断 invokeHandler()方法,下面是它的细节:
private boolean invokeHandler() { int handlerState = this.handlerState; return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING); }
我们重点关注如果 handlerState == ADD_COMPLETE这个条件成立的时候,会返回true,那么就会执行下面的方法:
invokeWrite0(msg, promise);
private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
我们先来考虑这个判断条件什么时候会成立。这里需要回到我们 HeadContext和TailContext的构造函数上:
HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); }
final void setAddComplete() { for (;;) { int oldState = handlerState; // Ensure we never update when the handlerState is REMOVE_COMPLETE already. // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not // exposing ordering guarantees. if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) { return; } } }
这里我们可以看到两个构造方法后面都会调用setAddComplete()方法,这个方法的代码我直接贴在后面了。这里表达的意思是进入一个无限循环,我们不会改变更新,直到oldState的状态改变为REMOVE_COMPLETE。然后会执行 HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)这个原子操作,将handlerState设置为ADD_COMPLETE。这样上面函数的成立条件就解释清楚了。
然后我们来看之后执行的操作:
private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
我们先看handler()这个方法,这个方法是返回当前context。也就是说,会执行TailContext的等效方法。比如read,就会执行TailContext的channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); } protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); } }
可以看到消息传递到tail之后,就不会再传递下去了,并且会释放它。出站事件也是类似,当事件传递到head的时候,会调用HeadContext的等效方法,然后就不会再传递下去了。