之前在项目中使用到了并发队列,场景为多写多读,查阅资料推荐使用ConcurretLinkedQueue,但不知道为什么。这里对并发队列ConcurrentLinkedQueue与LinkedBlockingQueue的源码做一个简单分析,比较一下两者差别,并测试在不同并发请求下读写的性能差异。使用的JDK版本为1.8。
使用方法很简单,该类实现了Queue接口,提供了offer()、poll()等入队和出队的操作接口。
多线程环境下的使用如下:
// 无界并发队列 ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(); // 模拟n个线程竞争环境 int n = 100; CountDownLatch countDownLatch = new CountDownLatch(n); for (int i = 0; i < n; i++) { int finalI = i; new Thread(()->{ // 进行10000次的写操作 for (int j = 0; j < 10000; j++) { queue.add(j); } // 进行10000次的读操作 for (int j = 0; j < 10000; j++) { queue.poll(); } // 该线程结束读写请求 System.out.println("Thread-"+ finalI +"结束"); countDownLatch.countDown(); }).start(); } // 直到所有线程结束读写 countDownLatch.await(); // 验证并发队列中元素是否清空 System.out.println("队列已清空:"+queue.isEmpty());
输出结果如下:
Thread-0结束 ........... Thread-55结束 队列已清空:true
该类使用了Node类来表示队列中的节点,包含一个volatile修饰的类型为传入泛型的item成员(节点存储的值)和volatile修饰的next指针。同时引入了Unsafe组件,使用了其CAS方法来替换item和next。其中lazySetNext()方法保证了volatile的语义,该次修改对下次读是可见的。
private static class Node<E> { volatile E item; volatile Node<E> next; Node(E item) { UNSAFE.putObject(this, itemOffset, item); } // CAS替换节点的值,返回是否成功 boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } // 给next引用赋值,这个方法保证了volatile的语义,即该修改对next读取是可见的 void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } // CAS替换next引用,返回是否成功 boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // unsafe类引入和相关静态代码 ... }
默认初始化方法如下:
public ConcurrentLinkedQueue() { // 创建空的头尾节点 head = tail = new Node<E>(null); }
还有一个基于已有集合的初始化方法,大致流程为:依次取出集合元素;检查是否为null;构建新节点;采用尾插法插入到链表尾部。
public ConcurrentLinkedQueue(Collection<? extends E> c) { Node<E> h = null, t = null; for (E e : c) { // 检查元素是否为null checkNotNull(e); // 基于集合中的元素构建新节点 Node<E> newNode = new Node<E>(e); // 第一个元素设置为头尾结点 if (h == null) h = t = newNode; else { // 其余元素采用尾插法插入 t.lazySetNext(newNode); t = newNode; } } // 集合为空集合时,新建值为nul的头尾节点 if (h == null) h = t = new Node<E>(null); head = h; tail = t; }
public boolean offer(E e) { // 确保元素非null,为null时抛出NullPointer异常 checkNotNull(e); // 基于传入值构造新节点 final Node<E> newNode = new Node<E>(e); // 自旋,直到入队成功 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // case1:此时p为队尾节点,q=null if (q == null) { // 通过cas的方式设置新节点为p的后继节点 // 如果失败,说明此时p已不再是队尾结点,继续进行自旋 // 如果成功,尝试修改tail后返回true if (p.casNext(null, newNode)) { // p != t代表此时p和第一次循环时相比已经向后移动了,此时就通过CAS的方式将tail节点修改为newNode // 失败了也没关系,代表有其他线程已经修改了tail if (p != t) // hop two nodes at a time casTail(t, newNode); return true; } } // case2:p=q,表示是删除的节点 else if (p == q) // t != (t = tail) 说明t!=tail,tail节点已经更新过,此时就使用tail赋值给p,然后继续自旋 // 否则说明tail没有更新过,指向出队的节点。这时就使用head赋值给p,然后继续自旋 p = (t != (t = tail)) ? t : head; // case3:p不是队尾节点,也没有出队。就更新p,然后继续自旋 else // case3.1:p!=t且t!=tail时,说明tail节点更新过,让p重新指向tail节点 // case3.2:否则,p往后移动一位,指向q p = (p != t && t != (t = tail)) ? t : q; } }
入队的逻辑看起来比较复杂,其核心思想就是自旋+cas的方式将新节点插入到队尾节点的后面。
这里就按第一次入队和第二次入队两种情况分析一下:
首先检查非空,然后构造新节点。
t和p都指向tail节点,q为null。此时进入case1:尝试CAS设置p.next为newNode。
成功的话,说明节点入队成功了。然后直接返回true
失败的话,说明p.next!=null,p不是队尾节点了,这时就自旋,q=p.next,然后会进入case3.2的逻辑,更新p。再次自旋,q=p.next,然后会进入case1的逻辑,然后重复上面一样的操作,直到CAS设置成功。
首先检查非空,然后构造新节点。
tail节点指向倒数第二个节点,t和p指向tail,q指向最后一个节点。此时进入case3:,执行case3.2的逻辑,p = q。
然后自旋后,q=p.next,进入case1,然后CAS设置p.next为newNode。成功了的话,会发现p!=t,执行重置tail节点的操作,该操作失败了说明有其他线程重置了,所以也ok。之后返回true。
// 将原head(h指向head节点)更新为p // 并将原head节点next指向自己,表示当前节点已经出队 final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) // 将head通过CAS的方式更新为p h.lazySetNext(h); // 将h节点的next指向自己,表示出队 } public E poll() { restartFromHead: // 大循环 for (;;) { // 自旋 for (Node<E> h = head, p = h, q;;) { E item = p.item; // case1:p指向节点为第一个有元素节点(实质上要出队的节点) // cas的方式设置item,失败了的话说明有其他线程将该接节点出队了,会再次自旋 if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. // p!=h,表示p已经向后移动了。此时 if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } // case2:如果p的后继节点为null,表示p已经是最后一个节点,无节点可出队了 else if ((q = p.next) == null) { // 更新头节点为p,然后返回null updateHead(h, p); return null; } // case3:p=q,表示p和q指向的节点已经出队,通过p和q已无法找到头节点,这时需要重新去获取head节点 else if (p == q) // 回到大循环中重新开始小循环自旋 continue restartFromHead; // case4:将p指向q,实质上是q往后移动一位 else p = q; } } }
出队的核心思想就是找到头节点,CAS将其item设置为null。如果成功的话,就可以出队了,如果失败了,就自旋再次寻找头结点。
这里也分析一下出队执行步骤:
最开始的时候,head节点的item应该是null(queue初始化方法创建的节点)。第一次循环,h和p指向head节点。
如果此时队列中没有元素,会进入case2,直接更新head节点后返回null。
如果队列中有元素,会进入case4,将q向后移动,然后再次自旋,进行case1的判断。如果case1中item!=null且cas设置成功,则表示出队成功,返回出队元素。如果cas设置失败,则继续自旋寻找头结点出队。直至出队成功,同时如果p!=h,会更新下头结点。在自旋的过程中,如果当前节点已经被出队了,会进入case3,然后回到大循环重新寻找head节点。
// 返回p的后继节点,如果p已经出队(next指向自身),则返回head节点 final Node<E> succ(Node<E> p) { Node<E> next = p.next; // 当一个节点从队列删除后,其next指针会指向自己。此时就返回head节点 return (p == next) ? head : next; } // 获取队首节点 Node<E> first() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item != null); // p节点有元素,或者p节点为最后一个节点 if (hasItem || (q = p.next) == null) { // 更新头结点 updateHead(h, p); // p节点有元素返回p,无元素代表p是最后一个节点,返回null return hasItem ? p : null; } // 如果p已经出队,重新回到大循环 else if (p == q) continue restartFromHead; // p向后移动一位 else p = q; } } } public int size() { int count = 0; // 获取首元素后,遍历后继节点的数量 for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null) // Collection.size() spec says to max out if (++count == Integer.MAX_VALUE) break; return count; }
可以看到计算的大小不是非常准确的,从获取到首节点开始后,一直遍历到尾结点。期间增加的节点都能被统计进入,出队的节点则不计入数量。所以计算的数量>=计算完成时刻的实际数量。
LinkedBlockingQueue实现了Queue接口,也提供了offer和poll等方法。同时也提供了put和带时间参数的offer和pool方法。简单示例如下:
// 无界并发队列 LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3); // 插入一个元素,容量满时会失败 queue.offer(1); // 插入一个元素,容量满时最多等待2s queue.offer(2, 2, TimeUnit.SECONDS); // 插入一个元素,容量满时会一直等待,直到能够入队 queue.put(3); // 取出一个元素,无元素时返回null queue.poll(); // 取出一个元素,无元素时最多等待2s queue.poll(2, TimeUnit.SECONDS);
使用了Node节点存储元素,不过没有UNSAFE组件,没有CAS操作。后面也可以看到,使用了可重入锁(独占锁),所以不需要考虑多线程同时修改属性的情况。
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } }
使用了head和last表示队列的头部和尾部节点,使用了入队锁和出队锁两个锁来实现同一时刻只有一个元素入队,同一时刻只有一个元素出队。使用了AotomicInteger类来表示队列中的元素个数。
transient Node<E> head; private transient Node<E> last; private final int capacity; private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
默认初始化方法,设置容量为Integer.MAX_VALUE
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
还有一个基于已有集合的初始化方法,大致思路为:
1.加上putLock入队锁;
2.遍历集合的所有元素,然后依次添加到队列中。
3.解锁。
由于使用了ReentrantLock,同一时刻只有单个线程入队,所以不用考虑并发问题。新增一个节点,然后将该节点添加到last节点后,最后更新last节点即可。
offer方法源码解析如下:需要注意,当入队时容量达到最大容量,会入队失败。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; // 当前容量已满时,直接返回false if (count.get() == capacity) return false; int c = -1; // 构建新节点 Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; // 入队锁加锁,已经被其它线程加锁时,当前线程会park挂起 putLock.lock(); try { // 只有当前元素个数<capacity,才能入队 if (count.get() < capacity) { // 执行入队操作 enqueue(node); // count数量+1 c = count.getAndIncrement(); // 如果当前元素个数<capacity,表示还可以继续入队 if (c + 1 < capacity) // 唤醒一个在notFull的条件等待队列中的线程 notFull.signal(); } } finally { // 入队锁解锁 putLock.unlock(); } // 如果此时元素数量为1,表示可以出队 if (c == 0) // 唤醒一个在notEmpty的条件等待队列中的线程 signalNotEmpty(); // c>=表示入队成功,返回true,反之入队失败,返回false return c >= 0; } // 节点入队,加到队尾节点,然后更新last private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; }
put方法相对于offer方法,多了一个等待逻辑,当元素数量达到最大容量时,会一直等待,直到能够入队。
putLock.lockInterruptibly(); try { // 多了一个等待的过程 // 如果容量已满,当前线程park并进入notFull的条件等待队列 while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); }
同一时刻只有单个线程出队,所以不用考虑并发问题。
offer方法源码解析如下:需要注意,当入队时容量达到最大容量,会入队失败。
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; // 出队锁加锁 takeLock.lock(); try { // 只有数量>0时才能出队 if (count.get() > 0) { // 执行出队操作 x = dequeue(); // 容器数量-1 c = count.getAndDecrement(); // 当容器数量>=1时,唤醒notEmpty条件队列中等待的一个线程 if (c > 1) notEmpty.signal(); } } finally { // 出队锁释放 takeLock.unlock(); } // 表示当前数量<capacity时,容器未满,唤醒notFull条件队列中等待的一个线程 if (c == capacity) signalNotFull(); return x; } // 节点出队操作 private E dequeue() { // 获取队首节点以及下一个节点(队首节点值都是null,下一个节点才是真正有元素的节点) Node<E> h = head; Node<E> first = h.next; // h节点next指向自身,表示出队 h.next = h; // 更新head节点 head = first; // 返回第一个实际节点的值并重置为null(head节点的item都是null) E x = first.item; first.item = null; return x; }
take方法相比于poll,多了一个等待逻辑,当元素数量=0时,会一直等待,直到能够入队。
takeLock.lockInterruptibly(); try { // 多了一个等待的过程 // 如果数量=0,当前线程park并进入notEmpty的条件等待队列 while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); }
直接获取原子变量capacity的值即可。由于入队和出队对数量大小的修改都是原子的,所以获取的数量大小是十分准确的,为当前时刻容器元素数量。
public int size() { return count.get(); }
通过之前的介绍,可以发现
ConcurrentLinkedQueue全程是无锁的,而LinkedBlockingQueue多线程出入队时会有挂起和唤醒线程的操作,会进行线程的上下文切换,相对来说更耗时。
这里设置了几组不同的线程数量和并发读取次数,来测试各自的完成时间,每组数据测试5次,取平均数据。使用了同一台机器(4核CPU)进行测试。
代码设计如下:
// 无界并发队列 ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(); long startTime = System.currentTimeMillis(); // 模拟n个线程竞争环境,各自完成m次插入和查找操作,计算最终完成时间 int n = 10; // 读写次数 int m = 10000; // 线程执行完成的计数器 CountDownLatch countDownLatch = new CountDownLatch(n); // 控制所有线程同时运行 CyclicBarrier cyclicBarrier = new CyclicBarrier(n); for (int i = 0; i < n; i++) { int finalI = i; new Thread(()->{ // 等待信号量的改变 try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } // 进行100000次的写操作 for (int j = 0; j < m; j++) { queue.add(j); } // 进行1000000次的读操作 for (int j = 0; j < m; j++) { queue.poll(); } // 该线程结束读写请求 System.out.println("Thread-"+ finalI +"结束"); countDownLatch.countDown(); }).start(); } // 直到所有线程结束读写,计算时间 countDownLatch.await(); long endTime = System.currentTimeMillis(); long costTime = endTime - startTime; System.out.println("所用时间:" + costTime + "ms"); // 验证并发队列中元素是否清空 System.out.println("队列已清空:"+queue.isEmpty()); 该次运行结果: Thread-9结束 ... Thread-8结束 所用时间:78ms 队列已清空:true
最终测试得到结果:
LinkedBlockingQueue测试结果(ms):
线程数量\读取次数 | 10000 | 50000 | 100000 |
---|---|---|---|
10 | 94 | 125 | 187 |
50 | 167 | 800 | 3109 |
100 | 266 | 1332 | 6168 |
200 | 503 | 5374 | 11365 |
ConcurrentLinkedQueue测试结果(ms):
线程数量\读取次数 | 10000 | 50000 | 100000 |
---|---|---|---|
10 | 78 | 156 | 249 |
50 | 172 | 594 | 1375 |
100 | 250 | 828 | 3343 |
200 | 437 | 1656 | 6300 |
可以发现,在线程数量较少时,两者的消耗时长差不多。当线程数量比较多,并且短时间内的读写请求数量较大时,ConcurrentLinkedQueue消耗时间明显更少。
https://zhuanlan.zhihu.com/p/224964810