之前的文章在分析NioEventLoop源码的时候,有提到过Netty没有用JDK提供的阻塞队列,而是使用了高性能无锁队列MpscQueue。因为篇幅原因,那篇文章并没有详细介绍MpscQueue,今天,它来啦!!!
在Netty较早的版本中,使用的是自己实现的任务队列,后来全部替换为JCTools工具的无锁化队列了,为啥呢?没别的,因为它的效率实在是太高了。
何为Mpsc???
JCTools提供了很多队列,大家需要针对不同的应用场景选择合适的队列,避免发生潜在的问题。
这里解释一下「MSPC」的含义,如下:
因此MpscQueue其实就是指:适用于多生产者,单消费者的高性能无锁队列!
之前的文章有说过,NioEventLoop是个单线程的线程池,提交到EventLoop的任务会被线程串行化执行。因此EventLoop的任务队列的生产消费模型是:多生产者,单消费者。
所以本篇文章只会重点分析MpscQueue,其他队列大家自行研究。
MpscQueue不是Netty提供的,因此在Netty项目里是看不到它的源码的,为了阅读方便,还是建议大家去单独拉JCTools的源码,地址:https://github.com/JCTools/JCTools。
Netty默认使用的队列是org.jctools.queues.MpscUnboundedArrayQueue
,这里只分析它。
MpscUnboundedArrayQueue是一个适用于「多生产者单消费者」的无界队列,这意味着它没有容量限制,你可以不断的往里面提交任务,即便没有消费者消费数据。
MpscUnboundedArrayQueue类的继承关系比较多,类图比较复杂,但是没关系,我们不用分析所有代码,只重点关注它的直接父类BaseMpscLinkedArrayQueue
即可,核心逻辑都在这里了,看懂BaseMpscLinkedArrayQueue
基本就知道它大概的一个实现思想了。
这里说明一下,MpscQueue的代码中存在大量类似如下代码:
byte b000,b001,b002,b003,b004,b005,b006,b007;// 8b byte b010,b011,b012,b013,b014,b015,b016,b017;// 16b byte b020,b021,b022,b023,b024,b025,b026,b027;// 24b
读者不用过分关注,忽略它即可,这些属性没有什么特别的用处,就是做字节填充用的,这涉及到CPU硬件缓存Cache Line,这里简单说下吧。
计算机除了有Memory主存,CPU每个核心还有自己单独的缓存,这些缓存按等级分为一级缓存、二级缓存等等,距离CPU核心越近的缓存效率越高。为啥CPU要有缓存?就是因为CPU的计算速度实在是太快了,相比之下内存的读写速度实在是太慢了,为了填补二者速度上的鸿沟,CPU被加入了多级缓存,CPU会将主存中的数据进行缓存,然后进行运算,运算完成后会在未来的某个时刻写入到主存。
CPU缓存数据的最小单位是Cache Line,在大多数CPU上,它的大小是64字节。只要Cache Line中的任一数据失效,整个Cache Line就会被认为是失效的,需要从主存中重新加载。
因此,如果Java对象/属性被缓存到同一个Cache Line上了,那就有可能因为其他线程修改了这一块的某个数据,导致所有线程的Cache Line全部失效,进而导致所有线程重新从主存中load,这会导致不必要的开销。
综上所述,MpscQueue做了优化,将可能会被频繁读写的数据,分配到不同的Cache Line,避免相互影响。
言归正传,回到MpscUnboundedArrayQueue,先说一下它的一个实现思路吧。
MpscUnboundedArrayQueue基本的数据结构由「数组+链表」组成,它有两个指针:producerBuffer和consumerBuffer,分别指向生产者生产和消费者消费对应的数组。它还有两个索引指针:producerIndex和consumerIndex,分别代表生产者生产和消费者消费的索引,这两个索引会以2为步长不断递增。
另外对于生产者,它还有一个producerLimit指针,它代表生产者生产消息的上限,达到该上限,Queue就要扩容了,扩容的方式是创建一个长度一样的新数组,然后旧数组的最后一个元素指向新数组,形成单向链表。
笔者画了一个简图,描述了MpscQueue的数据结构变化:
整体流程说的差不多了,下面开始分析源码,先看几个比较重要的属性:
// 生产者索引 private volatile long producerIndex; // 元素生产的限制,当producerIndex == producerLimit,代表队列需要扩容 private volatile long producerLimit; protected long producerMask; // 当前生产者指向的数组 protected E[] producerBuffer; // 消费者索引 private volatile long consumerIndex; protected long consumerMask; // 当前消费者指向的数组 protected E[] consumerBuffer; // 数组被生产者填满后,会填充一个JUMP,代表队列扩容了,消费者遇到JUMP会消费下一个数组。 private static final Object JUMP = new Object(); // 消费者消费完一个完整的数组后,会将最后一个元素设为BUFFER_CONSUMED。 private static final Object BUFFER_CONSUMED = new Object();
再看构造函数,需要给定一个chunkSize,指定块大小,MpscQueue由一系列数组构成,chunkSize就是数组的大小,它必须是一个2的幂次方数。
public MpscUnboundedArrayQueue(int chunkSize) { super(chunkSize); }
在父类的构造函数中,计算了mask,初始化了一个数组,并将producerBuffer和consumerBuffer都指向了同一个数组,然后根据mask设置producerLimit。
假设initialCapacity为8,数组的长度就是9,因为最后一个元素会用来存放扩容数组的地址,形成链表。每个数组还会预留一个槽位存放JUMP
元素,代表队列扩容了,消费者遇到JUMP
元素就会通过最后一个元素找到扩容后的数组继续消费,因此一个数组最多保留7个元素。
/** * 初始化 * @param initialCapacity 数组容量,要求是2的幂次方数 */ public BaseMpscLinkedArrayQueue(final int initialCapacity) { // initialCapacity必须大于等于2 RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2, "initialCapacity"); // 容量确保是2的幂次方数,找到initialCapacity下一个2的幂次方数 int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity); // index以2为步长递增,预留一个元素存JUMP,所以limit为:(capacity-1)*2 long mask = (p2capacity - 1) << 1; // need extra element to point at next array // 需要一个额外元素来链接下一个数组 E[] buffer = allocateRefArray(p2capacity + 1); // 生产者和消费者Buffer指向同一个数组 producerBuffer = buffer; producerMask = mask; consumerBuffer = buffer; consumerMask = mask; // 设置producerLimit = mask soProducerLimit(mask); }
Queue初始化完成后,就是不断的生产数据和消费数据了,所以接下来重点分析offer()
和poll()
方法。
offer(e)
会将元素e添加到队列中,即生产数据。在MpscQueue中,队列是不能存放空数据的,所以首先会检查非空。然后线程通过CAS的方式以步长为2递增producerIndex,CAS会保证只有一个线程操作成功,CAS成功就代表线程抢到了数组中的槽位,它可以将元素e添加到数组的指定槽位。CAS失败代表并发失败了,会自旋重试。
上面说的是producerIndex还没有达到producerLimit的情况,如果达到producerLimit,代表生产达到上限,队列可能需要扩容了。offerSlowPath()
方法会判断队列是否需要扩容,如果需要扩容,也只会交给一个线程去扩容,这里又是一个CAS操作,线程以1为步长递增producerIndex,只有CAS成功的线程才会去执行扩容逻辑。
因此,在offer(e)
的逻辑中,还会判断producerIndex是否是奇数,如果为奇数就代表队列正在扩容。因为MpscQueue的扩容非常快速,它不需要迁移元素,只需要创建一个新数组,再和旧数组建立连接就可以了,所以没有必要让其他线程挂起,线程发现队列在扩容时,会进行自旋重试,直到扩容完成。
/** * 向队列中添加一个元素e,生产数据 * @param e * @return */ @Override public boolean offer(final E e) { if (null == e) { // 非空校验 throw new NullPointerException(); } long mask; E[] buffer;//生产者指向的数组 long pIndex;//生产索引 while (true) { long producerLimit = lvProducerLimit(); // 获取生产者索引 pIndex = lvProducerIndex(); // 生产索引以2为步长递增,一般不会是奇数,在offerSlowPath()中扩容线程会将其设为奇数 if ((pIndex & 1) == 1) { // 奇数代表正在扩容,自旋,等待扩容完成 continue; } mask = this.producerMask; buffer = this.producerBuffer; // 生产索引达到producerLimit,代表可能需要扩容。 if (producerLimit <= pIndex) { int result = offerSlowPath(mask, pIndex, producerLimit); switch (result) { case CONTINUE_TO_P_INDEX_CAS: //producerLimit虽然达到了limit,但是当前数组已经被消费了部分数据,暂时不会扩容,会使用已被消费的槽位。 break; case RETRY://CAS失败,重试 continue; case QUEUE_FULL://队列满,offer失败 return false; case QUEUE_RESIZE://需要扩容 resize(mask, buffer, pIndex, e, null); return true; } } if (casProducerIndex(pIndex, pIndex + 2)) { // CAS递增producerIndex成功,抢到槽位,跳出自旋 break; } } final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask); // 将buffer数组的指定位置替换为e,不是根据下标来设置的,是根据槽位的地址偏移量offset,UNSAFE操作。 soRefElement(buffer, offset, e); // release element e return true; }
offerSlowPath()
会告诉线程队列是满了,还是需要扩容,还是需要自旋重试。虽然producerIndex达到了producerLimit,但不代表队列就非得扩容,如果消费者已经消费了部分生产者指向的数组元素,就意味这当前数组还是有槽位可以继续用的,暂时不用扩容。
/** * @param mask * @param pIndex 生产者索引 * @param producerLimit 生产者limit * @return */ private int offerSlowPath(long mask, long pIndex, long producerLimit) { // 消费者索引 final long cIndex = lvConsumerIndex(); // 数组缓冲的容量,(长度-1) * 2 long bufferCapacity = getCurrentBufferCapacity(mask); // 消费索引+当前数组的容量 > 生产索引,代表当前数组已有部分元素被消费了,不会扩容,会使用已被消费的槽位。 if (cIndex + bufferCapacity > pIndex) { if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)) { // CAS失败,自旋重试 return RETRY; } else { // 重试CAS修改 生产索引 return CONTINUE_TO_P_INDEX_CAS; } } // 根据生产者和消费者索引判断Queue是否已满,无解队列永不会满 else if (availableInQueue(pIndex, cIndex) <= 0) { return QUEUE_FULL; } // grab index for resize -> set lower bit // CAS的方式将producerIndex加1,奇数代表正在resize else if (casProducerIndex(pIndex, pIndex + 1)) { return QUEUE_RESIZE; } else { // resize失败,重试 return RETRY; } }
如果需要扩容,线程会CAS操作将producerIndex改为奇数,让其它线程能感知到队列正在扩容,要生产数据的线程先自旋,等待扩容完成再继续操作。
resize()
是扩容的核心方法,它首先会创建一个相同长度的新数组,将producerBuffer指向新数组,然后将元素e放到新数组中,旧元素的最后一个元素指向新数组,形成链表。还会将旧元素的槽位填充JUMP
元素,代表队列扩容了。
// 扩容:新建一个E[],将oldBuffer和newBuffer建立连接。 private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s) { assert (e != null && s == null) || (e == null || s != null); // 下一个Buffer的长度,MpscQueue会构建一个相同长度的Buffer int newBufferLength = getNextBufferSize(oldBuffer); final E[] newBuffer; try { // 创建一个新的E[] newBuffer = allocateRefArray(newBufferLength); } catch (OutOfMemoryError oom) { assert lvProducerIndex() == pIndex + 1; soProducerIndex(pIndex); throw oom; } // 生产者Buffer指向新的E[] producerBuffer = newBuffer; // 计算新的Mask,Buffer长度不变的情况下,Mask也不变 final int newMask = (newBufferLength - 2) << 1; producerMask = newMask; // 根据该偏移量设置oldBuffer的JUMP元素,会递增然后重置,不断循环 final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask); // Mask不变的情况下,oldBuffer的JUMP对应的位置,就是newBuffer中要消费的位置. final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask); // 元素e放到新数组中 soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e); // 旧数组和新数组建立连接,旧数组的最后一个元素保存新数组的地址。 soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer); // 消费者索引 final long cIndex = lvConsumerIndex(); // 根据消费者和生产者索引,校验Queue是否已满。对于无界队列,返回Integer.MAX_VALUE,永远都不会满。 final long availableInQueue = availableInQueue(pIndex, cIndex); RangeUtil.checkPositive(availableInQueue, "availableInQueue"); // 设置新的producerLimit soProducerLimit(pIndex + Math.min(newMask, availableInQueue)); /* 扩容的时候会将producerIndex设为pIndex+1,奇数代表正在扩容,非扩容线程会自旋重试,等待扩容完成。 现在元素已经放入队列,将producerIndex设为pIndex+2,让其他线程知道扩容完成。 */ soProducerIndex(pIndex + 2); /* 将旧数组的指定位置设为JUMP,消费者遇到JUMP就知道队列扩容了,会寻找next连接的数组。 */ soRefElement(oldBuffer, offsetInOld, JUMP); }
offer()
主要流程就这样,CAS抢槽位,确保只有单个线程能生产,CAS失败的线程自旋重试。如果遇到队列需要扩容,则将producerIndex设为奇数,其他线程自旋等待扩容完成,扩容后再设为偶数,通知其它线程继续生产。
元素生产好了,就是为了调用poll()
来进行消费的。
poll()
首先还是找到consumerBuffer指向的当前消费数组,根据消费索引consumerIndex计算要消费的元素相较于Array的内存地址偏移量,根据这个偏移量来获取元素。
如果元素为null,并不代表队列是空的,还要比较consumerIndex和producerIndex,如果两者索引不同,那么producerIndex肯定是大于consumerIndex的,说明生产者已经在生产了,移动了producerIndex,只是还没来得及将元素填充到数组而已。因为生产者是先CAS递增producerIndex,再将元素填充到数组的,两步之间存在一个非常短的时间差,如果消费者恰好在这个时间差内去消费数据,那么就自旋等待一下,等待生产者填充元素到数组。
如果元素为JUMP
,说明队列扩容了,消费者需要根据数组的最后一个元素找到扩容后的新数组,消费新数组的元素。
// MpscQueue是多生产单消费者的Queue,因此poll()没有做并发控制。 @Override public E poll() { /* consumerBuffer和producerBuffer会在Queue的构造函数中被初始化, 初始化时,两者会指向同一个数组。随着生产者不断生产数据,Queue扩容,producerBuffer会慢慢指向新的数组。 */ final E[] buffer = consumerBuffer; // 消费者索引 final long index = lpConsumerIndex(); final long mask = consumerMask; // 计算消费者需要消费的元素在数组中的地址偏移量 final long offset = modifiedCalcCircularRefElementOffset(index, mask); // 根据offset取出元素e Object e = lvRefElement(buffer, offset); if (e == null) { if (index != lvProducerIndex()) { /* offer()时生产者先CAS改producerIndex,再设置元素。 中间会有一个时间差,此时会自旋,等待元素设置完成。 */ do { e = lvRefElement(buffer, offset); } while (e == null); } else {//元素已经消费完 return null; } } if (e == JUMP) {// 代表队列扩容了 /* 通过当前数组的最后一个元素,获取下一个待消费的数组, 同时,消费者还会将最后一个元素设为BUFFER_CONSUMED,代表当前数组已经消费完毕。 */ final E[] nextBuffer = nextBuffer(buffer, mask); // 从新数组中消费元素 return newBufferPoll(nextBuffer, index); } // 取出元素后,将原来的槽位设为null soRefElement(buffer, offset, null); // 递增consumerIndex soConsumerIndex(index + 2); return (E) e; }
如果队列扩容了,nextBuffer()
会找到扩容后的新数组,同时它还会将旧数组的最后一个元素设为BUFFER_CONSUMED
,代表当前数组已经被消费完了,也就从链表中剔除了。
// 找到扩容后的新数组 private E[] nextBuffer(final E[] buffer, final long mask) { // 计算数组最后一个元素的地址偏移量 final long offset = nextArrayOffset(mask); // 找到下一个数组 final E[] nextBuffer = (E[]) lvRefElement(buffer, offset); // 消费者Buffer指向新数组 consumerBuffer = nextBuffer; // 重新计算Mask,数组长度不变的则Mask不变 consumerMask = (length(nextBuffer) - 2) << 1; // 将旧数组的最后一个元素设为BUFFER_CONSUMED,代表消费完毕。 soRefElement(buffer, offset, BUFFER_CONSUMED); return nextBuffer; }
得到新数组后,会调用newBufferPoll()
从新数组中消费数据:
// 从扩容后的新数组里消费数据,索引下标不变 private E newBufferPoll(E[] nextBuffer, long index) { // 根据consumerIndex计算要消费的元素相较于Array的内存偏移量 final long offset = modifiedCalcCircularRefElementOffset(index, consumerMask); // 根据offset取出这个元素 final E n = lvRefElement(nextBuffer, offset); if (n == null) {//offer()的元素不可能为null,一般不会进这个if throw new IllegalStateException("new buffer must have at least one element"); } // 元素取出后将那个那个槽位设为null soRefElement(nextBuffer, offset, null); // 递增consumerIndex soConsumerIndex(index + 2); return n; }
消费者取出数据后,会将数组原来的槽位中填充null,其实也就代表这个槽位没有使用,可以被复用。
至此,poll()
的消费流程也全部分析结束了,可以看到,全程都没有挂起线程,顶多就是自旋等待。
MpscQueue是一个「多生产者单消费者」的高性能无锁队列,符合Netty EventLoop的任务消费模型。
它用到了大量的CAS操作,对于需要做并发控制的地方,确保只有一个线程会执行成功,其他CAS失败的线程会自旋重试,全程都是无锁非阻塞的。不管是扩容,还是等待元素被填充到数组,这些过程都是会极快完成的,因此短暂的自旋会比线程挂起再唤醒效率更高。
MpscQueue由一系列数组构成,数组的最后一个元素指向下一个数组,形成单向链表。数组扩容后会在原槽位填充JUMP
元素,消费者遇到该元素就知道要寻找新数组继续消费了。
MpscQueue全程无锁化,非阻塞,相较于JDK提供的同步阻塞队列,性能有很好的提升,这也是Netty后来的版本将任务队列替换为JCtools的重要原因。