@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 可以看到是基于ByteBuf进行解码的 if (msg instanceof ByteBuf) { // 把callldecode的解析的对象都放到out当中 CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; // 说明是第一次从io流里面读取数据 first = cumulation == null; if (first) { // 第一次把累加器赋值给刚读进来的对象 cumulation = data; } else { //不是第一次则将读进来的数据与cumulator进行累加 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } // 调用子类的decode方法进行解析 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 numReads = 0; discardSomeReadBytes(); } // 将解析出的bytebuf向下传播 int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); fireChannelRead(ctx, out, size); //跟进 fireChannelRead /* static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) { if (msgs instanceof CodecOutputList) { fireChannelRead(ctx, (CodecOutputList) msgs, numElements); } else { **msgs.get(i)-->Bytebuf ** for (int i = 0; i < numElements; i++) { ctx.fireChannelRead(msgs.get(i)); } } } */ // 回收list(可以看到和之前的entry一样,采用了对象池的机制) out.recycle(); } } else { ctx.fireChannelRead(msg); } }
上面提到了comulation,这里我们讲下
public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { ByteBuf buffer; if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1) { // Expand cumulation (by replace it) when either there is not more room in the buffer // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or // duplicate().retain(). // // See: // - https://github.com/netty/netty/issues/2327 // - https://github.com/netty/netty/issues/1764 // 扩容 buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; } // 写入cumulation buffer.writeBytes(in); in.release(); return buffer; } };
进入到calldecode方法中进行分析:
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) { // 一直检测bytebuf是否有数据可读 // 记录一下outsize的大小 int outSize = out.size(); // 如果outsize的大小>0,说明已经有解析出的对象,则将事件向下传播 if (outSize > 0) { fireChannelRead(ctx, out, outSize); // 清空out out.clear(); // Check if this handler was removed before continuing with decoding. // If it was removed, it is not safe to continue to operate on the buffer. // // See: // - https://github.com/netty/netty/issues/4635 if (ctx.isRemoved()) { break; } outSize = 0; } // 在decode之前记录一下可读长度 int oldInputLength = in.readableBytes(); // 进行解码 decode(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } // 说明没有解析出对象 if (outSize == out.size()) { // 说明本次读没有读取数据,当前累加的数据没有拼成一个完整的数据包 if (oldInputLength == in.readableBytes()) { break; } else { // 说明读到了数据,则有可能触发解析,所以continue,进行下次循环 continue; } } // 走到这里说明解析出对象,但是没有从cumulation中读取数据,则报错 if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } ........ }
public class LineBasedFrameDecoder extends ByteToMessageDecoder { /** Maximum length of a frame we're willing to decode. */ // 数据包最大长度 private final int maxLength; /** Whether or not to throw an exception as soon as we exceed maxLength. */ //超过最大长度的时候是否立即抛出异常,如果为true,则抛出 private final boolean failFast; // 最终解析出是否带换行符:true:不带换行符 private final boolean stripDelimiter; /** True if we're discarding input because we're already over maxLength. */ // 丢弃模式 private boolean discarding; // 解码到现在已经丢弃了多少字节 private int discardedBytes;
@Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } }
接下来我们就进入decode方法进行分析
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { // 找到这一行的结尾位置 final int eol = findEndOfLine(buffer); //判断是否是丢失模式,第一次为false if (!discarding) { if (eol >= 0) { final ByteBuf frame; //计算换行符到可读字节之间的长度 final int length = eol - buffer.readerIndex(); // 拿到分隔符长度 final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1; // 超过最大长度 if (length > maxLength) { //将readIndex移到换行符之后的位置 buffer.readerIndex(eol + delimLength); // 传播异常 fail(ctx, length); // 返回null 什么也没解析 return null; } // 判断是否要把分隔符也算在完整数据包下。 if (stripDelimiter) { // 不包含分隔符 frame = buffer.readRetainedSlice(length); buffer.skipBytes(delimLength); } else { // 不包含分隔符 frame = buffer.readRetainedSlice(length + delimLength); } return frame; } else { // 非丢弃模式下没有找到换行符 final int length = buffer.readableBytes(); if (length > maxLength) { // 超过最大长度 // 将length长度丢弃 discardedBytes = length; // 将读指针移动到写指针 buffer.readerIndex(buffer.writerIndex()); // 标记丢弃模式 discarding = true; // 传播异常 if (failFast) { fail(ctx, "over " + discardedBytes); } } // 啥也没解析到 return null; } } // 进入丢弃模式 else { // 找到endofline if (eol >= 0) { // 前面已经丢弃过的+这次要丢弃的 final int length = discardedBytes + eol - buffer.readerIndex(); final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1; // 将读指针移到换行符之后第一个有效数据位置 buffer.readerIndex(eol + delimLength); // 标记没有丢弃的数据了 discardedBytes = 0; // 标记discarding为非丢弃 discarding = false; if (!failFast) { fail(ctx, length); } } else { //没有找到endline,全部丢失,读指针与写指针置为相同 discardedBytes += buffer.readableBytes(); buffer.readerIndex(buffer.writerIndex()); } return null; } }
// 分隔符可以变化 public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters) { this(maxFrameLength, true, delimiters); }
@Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { // 由父类向下传播 out.add(decoded); } }
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { // 受限判断是不是lineBasedDecoder,当指定delimiter为/r/n /n 时,在创建 // DelimiterBasedFrameDecoder的时候就会创建lineBasedDecoder if (lineBasedDecoder != null) { return lineBasedDecoder.decode(ctx, buffer); } // Try all delimiters and choose the delimiter which yields the shortest frame. int minFrameLength = Integer.MAX_VALUE; ByteBuf minDelim = null; // 遍历分隔符找到其中一个分隔符划分最小数据包的长度,将此分隔符置为 minDelim,将 minFrameLength // 置为该分割符划分的长度 for (ByteBuf delim: delimiters) { int frameLength = indexOf(buffer, delim); if (frameLength >= 0 && frameLength < minFrameLength) { minFrameLength = frameLength; minDelim = delim; } } // 找到分割符不为null if (minDelim != null) { int minDelimLength = minDelim.capacity(); ByteBuf frame; // 判断是否处于丢弃模式,第一次不会处于丢弃模式, if (discardingTooLongFrame) { // 处于丢弃模式下 // We've just finished discarding a very large frame. // Go back to the initial state. // 标记为非丢弃模式 discardingTooLongFrame = false; //跳过这段数据包 buffer.skipBytes(minFrameLength + minDelimLength); //this.tooLongFrameLength:丢弃的字节数 int tooLongFrameLength = this.tooLongFrameLength; this.tooLongFrameLength = 0; if (!failFast) { fail(tooLongFrameLength); } return null; } // 超过规定最大数据包长度 if (minFrameLength > maxFrameLength) { // Discard read frame. buffer.skipBytes(minFrameLength + minDelimLength); fail(minFrameLength); return null; } // 是否包含分隔符 if (stripDelimiter) { frame = buffer.readRetainedSlice(minFrameLength); buffer.skipBytes(minDelimLength); } else { frame = buffer.readRetainedSlice(minFrameLength + minDelimLength); } return frame; } else { //没有找到分隔符:所有可读字段全部丢弃 // 处于非丢弃模式 if (!discardingTooLongFrame) { if (buffer.readableBytes() > maxFrameLength) { // 标记丢弃字节长度 // Discard the content of the buffer until a delimiter is found. tooLongFrameLength = buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); // 转换为丢弃模式 discardingTooLongFrame = true; if (failFast) { fail(tooLongFrameLength); } } } // 处于丢弃模式:已经丢弃+当前要丢弃的 else { // Still discarding the buffer since a delimiter is not found. tooLongFrameLength += buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); } return null; } }
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder { // 下面介绍几个颇为重要的参数 private final ByteOrder byteOrder; private final int maxFrameLength; //指出包长度的偏移量 private final int lengthFieldOffset; // 指出数据包长度 private final int lengthFieldLength; private final int lengthFieldEndOffset; // 后面数据包实际长度等于 lengthFieldLength+lengthAdjustment private final int lengthAdjustment; // 在最后解析的数据包中需要跳过多少字节 private final int initialBytesToStrip; private final boolean failFast; private boolean discardingTooLongFrame; private long tooLongFrameLength; private long bytesToDiscard; @Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } } 下面介绍实际的逻辑,逻辑较为复杂,所以下面分为三个部分进行介绍 }
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { // 开启丢弃模式 if (discardingTooLongFrame) { long bytesToDiscard = this.bytesToDiscard; // 计算现在能够丢弃的数据 int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes()); in.skipBytes(localBytesToDiscard); bytesToDiscard -= localBytesToDiscard; this.bytesToDiscard = bytesToDiscard; // 通过failIfNecessary判断是否丢弃完了,然后从该方法中恢复非丢弃状态 failIfNecessary(false); } /* private void failIfNecessary(boolean firstDetectionOfTooLongFrame) { if (bytesToDiscard == 0) { // Reset to the initial state and tell the handlers that // the frame was too large. long tooLongFrameLength = this.tooLongFrameLength; this.tooLongFrameLength = 0; discardingTooLongFrame = false; if (!failFast || failFast && firstDetectionOfTooLongFrame) { fail(tooLongFrameLength); } } else { // Keep discarding and notify handlers if necessary. if (failFast && firstDetectionOfTooLongFrame) { fail(tooLongFrameLength); } } } */ // lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength; // 所以 lengthFieldEndOffset即为指向真正数据的前一个指针, if (in.readableBytes() < lengthFieldEndOffset) { // 说明此时还没有真正的数据可以读 return null; } int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset; // 表示从actualLengthFieldOffset起,读lengthFieldLength的value字节数,这个framelength // 相当于lengthFieldLength中所表示的值 long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder); if (frameLength < 0) { in.skipBytes(lengthFieldEndOffset); throw new CorruptedFrameException( "negative pre-adjustment length field: " + frameLength); } // 完整的数据包字节数=frame+前面指针值+lengthAdjustment frameLength += lengthAdjustment + lengthFieldEndOffset; if (frameLength < lengthFieldEndOffset) { in.skipBytes(lengthFieldEndOffset); throw new CorruptedFrameException( "Adjusted frame length (" + frameLength + ") is less " + "than lengthFieldEndOffset: " + lengthFieldEndOffset); } //(在这个逻辑处将会触发进入丢弃模式) if (frameLength > maxFrameLength) { // 计算可以丢弃的字节数 long discard = frameLength - in.readableBytes(); tooLongFrameLength = frameLength; // 如果小于0,则代表frameLength小于可以读的数据,所以这次数据全部丢掉 if (discard < 0) { // buffer contains more bytes then the frameLength so we can discard all now in.skipBytes((int) frameLength); } else { // discard>0,则代表这一次无法全部丢弃,保留仍待丢弃的数据,待下次在进行丢弃,并开启丢弃模式,丢弃字节数由bytesToDiscard保存,然后进入上面源码开始的地方,进行分析 // Enter the discard mode and discard everything received so far. discardingTooLongFrame = true; bytesToDiscard = discard; in.skipBytes(in.readableBytes()); } failIfNecessary(true); return null; } // never overflows because it's less than maxFrameLength int frameLengthInt = (int) frameLength; // 什么也不做,因为可读的数据流小于我们想要定义读的一个完整数据包 if (in.readableBytes() < frameLengthInt) { return null; } //(2):进入到跳过多少字节这一个步骤,首先如果initialBytesToStrip(跳过字节数)>frameLengthInt // 则抛出异常,因为这样子相当于没有任何意义。 if (initialBytesToStrip > frameLengthInt) { in.skipBytes(frameLengthInt); throw new CorruptedFrameException( "Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + initialBytesToStrip); } // 如果initialBytesToStrip合理,则跳过指定的字节数 in.skipBytes(initialBytesToStrip); // extract frame // 获取当前读指针 int readerIndex = in.readerIndex(); // 获取需要读取的真正字节数 int actualFrameLength = frameLengthInt - initialBytesToStrip; // 表示从buf里readindex指针处开始,抽取出actualFrameLength字节 ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength); // 然后将读指针进行向下偏移 in.readerIndex(readerIndex + actualFrameLength); return frame; }