ReplayingDecoder可以重复解码的解码器,此类的核心原理是内部包含了一个ReplayingDecoderByteBuf,当读取字节不够时则抛出异常,ReplayingDecoder捕获异常还原读取readerIndex然后等待Netty下一次事件继续读取。
ReplayingDecoderByteBuf集成了Bytebuf,它代理了ByteBuf当中读取的方法。
final class ReplayingDecoderByteBuf extends ByteBuf
我们拿readInt来分析,在真正读取之前先检查可读取字节长度checkReadableBytes,如果不够读的则抛出异常。
@Override public int readInt() { checkReadableBytes(4); return buffer.readInt(); } private void checkReadableBytes(int readableBytes) { if (buffer.readableBytes() < readableBytes) { throw REPLAY; } }
callDecode方法会将Bytebuf包装成ReplayingDecoderByteBuf,子类解码时如果字节流长度不够则抛出异常,捕获异常后还原Bytebuf的readerIndex位置然后等待Netty下次事件回调。
@Override protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { //包装ByteBuf到内部 replayable.setCumulation(in); try { //如果in可读 while (in.isReadable()) { //记录in的读取下标位置 int oldReaderIndex = checkpoint = in.readerIndex(); int outSize = out.size(); //如果存在已解码的对象则fire到下一个handler if (outSize > 0) { fireChannelRead(ctx, out, outSize); out.clear(); if (ctx.isRemoved()) { break; } outSize = 0; } S oldState = state; //可读字节数量 int oldInputLength = in.readableBytes(); try { //调用解码逻辑-由子类实现 //由于传入了replayable对象,在子类解码实现中读取字节不够则抛出Signal异常 decodeRemovalReentryProtection(ctx, replayable, out); if (ctx.isRemoved()) { break; } //outSize == out.size()说明子类解码没解析出数据 if (outSize == out.size()) { //oldInputLength == in.readableBytes()说明字节流有变化并且oldState == state则抛出异常 if (oldInputLength == in.readableBytes() && oldState == state) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " + "data or change its state if it did not decode anything."); } else { // Previous data has been discarded or caused state transition. // Probably it is reading on. continue; } } } catch (Signal replay) { replay.expect(REPLAY); if (ctx.isRemoved()) { break; } //如果解码时出现异常,说明in的字节不够读取 int checkpoint = this.checkpoint; if (checkpoint >= 0) { //还原in的读取位置 in.readerIndex(checkpoint); } else { // Called by cleanup() - no need to maintain the readerIndex // anymore because the buffer has been released already. } break; } if (oldReaderIndex == in.readerIndex() && oldState == state) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " + "or change its state if it decoded something."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Exception cause) { throw new DecoderException(cause); } }
ReplayingDecoder和ByteToMessageDecoder之间的最大区别在于ReplayingDecoder允许您实现decode()和decodeLast()方法,就像已经接收到所有必需字节一样,而不是检查所需字节的可用性。 例如,以下ByteToMessageDecoder实现:
public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { //检查是否有4个字节,没有则返回等待下次netty事件 if (buf.readableBytes() < 4) { return; } //标记读取索引 buf.markReaderIndex(); //读取4个字节返回Int数字代表包的长度 int length = buf.readInt(); //剩余字节如果不够length if (buf.readableBytes() < length) { //还原读取索引等待下次netty事件 buf.resetReaderIndex(); return; } //读取length个字节放入解码器 out.add(buf.readBytes(length)); } }
简化实现
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> { protected void decode(ChannelHandlerContext ctx, ByteBuf buf) throws Exception { out.add(buf.readBytes(buf.readInt())); } }
错误的做法
public class MyDecoder extends ReplayingDecoder<Void> { private final Queue<Integer> values = new LinkedList<Integer>(); @Override public void decode(.., ByteBuf buf, List<Object> out) throws Exception { //错误的做法,因为第一次offer可能成功,第二次失败,然后buf被还原,netty下次事件 //导致消息又重新读了2次,那么队列中会有3个对象。 // A message contains 2 integers. values.offer(buf.readInt()); values.offer(buf.readInt()); // This assertion will fail intermittently since values.offer() // can be called more than two times! assert values.size() == 2; out.add(values.poll() + values.poll()); } }
正确的做法
public class MyDecoder extends ReplayingDecoder<Void> { private final Queue<Integer> values = new LinkedList<Integer>(); @Override public void decode(.., ByteBuf buf, List<Object> out) throws Exception { // Revert the state of the variable that might have been changed // since the last partial decode. //每次清空变量 values.clear(); // A message contains 2 integers. values.offer(buf.readInt()); values.offer(buf.readInt()); // Now we know this assertion will never fail. assert values.size() == 2; out.add(values.poll() + values.poll()); } }
性能提升技巧,设置还原点,通过checkpoint方法,获取bytebuf的读取索引位置,当需要还原时只还原到checkpoint的位置,而不是还原到开始位置。
/** * Stores the internal cumulative buffer's reader position. */ protected void checkpoint() { checkpoint = internalBuffer().readerIndex(); } /** * Stores the internal cumulative buffer's reader position and updates * the current decoder state. */ protected void checkpoint(S state) { checkpoint(); state(state); }
public enum MyDecoderState { READ_LENGTH, READ_CONTENT; } public class IntegerHeaderFrameDecoder extends ReplayingDecoder<MyDecoderState> { //包的长度 private int length; //设置初始状态 public IntegerHeaderFrameDecoder() { // Set the initial state. super(MyDecoderState.READ_LENGTH); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { switch (state()) { //读取包长状态 case READ_LENGTH: //读取包长-失败出异常等待Netty下次事件重新读 length = buf.readInt(); //读取成功设置还原点并更新状态 checkpoint(MyDecoderState.READ_CONTENT); case READ_CONTENT: //读取length长度的包,如果字节不够抛出异常,还原到上面设置的还原点,避免每次都读取//buf.readInt(); //读取成功设置还原点,更新状态 ByteBuf frame = buf.readBytes(length); checkpoint(MyDecoderState.READ_LENGTH); out.add(frame); break; default: throw new Error("Shouldn't reach here."); } } }
上面的例子,length = buf.readInt(); checkpoint(MyDecoderState.READ_CONTENT);
读取readInt成功后,就设置还原点,接着继续读取buf.readBytes(length);,如果这时字节长度不够,就会抛出异,系统会还原ByteBuf到上次的还原点,等待下次Netty事件,避免了系统反复调用length = buf.readInt();