ChannelHandlerContext.write()
(或者writeAndFlush()
) 方法返回ChannelFuture对象,一个ChannelFuture对象代表尚未发生的IO操作,因为在Netty中所有的操作都是异步的,下面的方法可能会在发送消息之前关闭连接。
Channel ch = ...; ch.writeAndFlush(message); ch.close();
因为在Netty中操作是异步的,ch.writeAndFlush(message)也是异步的,该方法只是把发送消息加入了任务队列,这时直接关闭连接会导致问题。所以我们需要在消息发送完毕后在去关闭连接。
final ChannelFuture f = ctx.writeAndFlush(time); f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { ctx.close(); } });
通过ChannelFuture我们可以添加Listener,那么在消息发送完成后会进行回调,我们再去处理关闭连接等业务逻辑。
下面给出添加Listener的源码分析
@Override public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); //把listener对象加入到数组中 synchronized (this) { addListener0(listener); } //判断当前任务是否已经完成,如完成这里直接触发回调 if (isDone()) { notifyListeners(); } return this; }
addListener0方法,负责把用户一个一个添加的listener对象转换为数组结构DefaultFutureListeners,存储到listeners 成员变量
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener); } }
notifyListeners方法,将通知回调任务添加到eventloop当中,那么eventloop当中的任务顺序就是- ctx.writeAndFlush(time),notifyListeners,这就保证了再发送消息完毕后,会执行notifyListeners去回调监听器
private void notifyListeners() { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); }