PriorityBlockingQueue是一个支持优先级排序的无界阻塞队列。
Note:
PriorityBlockingQueue并不会阻塞生产者,而只是在没有可消费的数据时阻塞消费者;因此使用的时候需要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,最终会耗尽所有可用的内存空间。
PriorityBlockingQueue的类图如下:
PriorityBlockingQueue的定义如下:
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = 5595510919245408276L; /* * The implementation uses an array-based binary heap, with public * operations protected with a single lock. However, allocation * during resizing uses a simple spinlock (used only while not * holding main lock) in order to allow takes to operate * concurrently with allocation. This avoids repeated * postponement of waiting consumers and consequent element * build-up. The need to back away from lock during allocation * makes it impossible to simply wrap delegated * java.util.PriorityQueue operations within a lock, as was done * in a previous version of this class. To maintain * interoperability, a plain PriorityQueue is still used during * serialization, which maintains compatibility at the expense of * transiently doubling overhead. */ /** * Default array capacity. */ private static final int DEFAULT_INITIAL_CAPACITY = 11; /** * The maximum size of array to allocate. * Some VMs reserve some header words in an array. * Attempts to allocate larger arrays may result in * OutOfMemoryError: Requested array size exceeds VM limit */ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /** * Priority queue represented as a balanced binary heap: the two * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The * priority queue is ordered by comparator, or by the elements' * natural ordering, if comparator is null: For each node n in the * heap and each descendant d of n, n <= d. The element with the * lowest value is in queue[0], assuming the queue is nonempty. */ private transient Object[] queue; /** * The number of elements in the priority queue. */ private transient int size; /** * The comparator, or null if priority queue uses elements' * natural ordering. */ private transient Comparator<? super E> comparator; /** * Lock used for all public operations. */ private final ReentrantLock lock = new ReentrantLock(); /** * Condition for blocking when empty. */ private final Condition notEmpty = lock.newCondition(); /** * Spinlock for allocation, acquired via CAS. */ private transient volatile int allocationSpinLock; /** * A plain PriorityQueue used only for serialization, * to maintain compatibility with previous versions * of this class. Non-null only during serialization/deserialization. */ private PriorityQueue<E> q;
其构造函数如下:
/** * Creates a {@code PriorityBlockingQueue} with the default * initial capacity (11) that orders its elements according to * their {@linkplain Comparable natural ordering}. */ public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } /** * Creates a {@code PriorityBlockingQueue} with the specified * initial capacity that orders its elements according to their * {@linkplain Comparable natural ordering}. * * @param initialCapacity the initial capacity for this priority queue * @throws IllegalArgumentException if {@code initialCapacity} is less * than 1 */ public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } /** * Creates a {@code PriorityBlockingQueue} with the specified initial * capacity that orders its elements according to the specified * comparator. * * @param initialCapacity the initial capacity for this priority queue * @param comparator the comparator that will be used to order this * priority queue. If {@code null}, the {@linkplain Comparable * natural ordering} of the elements will be used. * @throws IllegalArgumentException if {@code initialCapacity} is less * than 1 */ public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.comparator = comparator; this.queue = new Object[Math.max(1, initialCapacity)]; } /** * Creates a {@code PriorityBlockingQueue} containing the elements * in the specified collection. If the specified collection is a * {@link SortedSet} or a {@link PriorityQueue}, this * priority queue will be ordered according to the same ordering. * Otherwise, this priority queue will be ordered according to the * {@linkplain Comparable natural ordering} of its elements. * * @param c the collection whose elements are to be placed * into this priority queue * @throws ClassCastException if elements of the specified collection * cannot be compared to one another according to the priority * queue's ordering * @throws NullPointerException if the specified collection or any * of its elements are null */ public PriorityBlockingQueue(Collection<? extends E> c) { boolean heapify = true; // true if not known to be in heap order boolean screen = true; // true if must screen for nulls if (c instanceof SortedSet<?>) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue<?>) { PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } Object[] es = c.toArray(); int n = es.length; if (c.getClass() != java.util.ArrayList.class) es = Arrays.copyOf(es, n, Object[].class); if (screen && (n == 1 || this.comparator != null)) { for (Object e : es) if (e == null) throw new NullPointerException(); } this.queue = ensureNonEmpty(es); this.size = n; if (heapify) heapify(); }
解读:
PriorityBlockingQueue 内部有一个数组 queue,用来存放队列元素,size用来存放队列元素个数。
独占锁对象lock 用来控制某个时间只能有一个线程可以进行入队、出队操作;notEmpty 条件变量用来实现 take 方法阻塞模式(跟其它阻塞队列相比,这里没有 notFull 条件变量,这是因为PriorityBlockingQueue是无界队列,其put 方法是非阻塞的)。
每次出队都返回优先级最高或者最低的元素,默认使用对象的 compareTo 方法提供比较规则,这意味着队列元素必须实现了 Comparable 接口;如果需要自定义比较规则则可以通过构造函数自定义 comparator。
Note:
allocationspinLock是个自旋锁,它使用 CAS操作来保证某个时间只有一个线程可以扩容队列,状态为 0或者 1,0表示当前没有进行扩容,1表示当前正在扩容。
PriorityBlockingQueue内部是使用平衡二叉树堆实现的,所以直接遍历队列元素不保证元素有序。
offer的方法如下:
/** * Inserts the specified element into this priority queue. * As the queue is unbounded, this method will never return {@code false}. * * @param e the element to add * @return {@code true} (as specified by {@link Queue#offer}) * @throws ClassCastException if the specified element cannot be compared * with elements currently in the priority queue according to the * priority queue's ordering * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] es; while ((n = size) >= (cap = (es = queue).length)) tryGrow(es, cap); try { final Comparator<? super E> cmp; if ((cmp = comparator) == null) siftUpComparable(n, e, es); else siftUpUsingComparator(n, e, es, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
解读:
由于是无界队列,所以一直返回 true。
put方法的代码如下:
/** * Inserts the specified element into this priority queue. * As the queue is unbounded, this method will never block. * * @param e the element to add * @throws ClassCastException if the specified element cannot be compared * with elements currently in the priority queue according to the * priority queue's ordering * @throws NullPointerException if the specified element is null */ public void put(E e) { offer(e); // never need to block }
解读:
put方法是直接调用offer方法来实现的
poll方法的定义如下:
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); } finally { lock.unlock(); } }
take方法的定义如下:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; }
此处重点研究一下扩容方法的实现,代码如下:
/** * Tries to grow array to accommodate at least one more element * (but normally expand by about 50%), giving up (allowing retry) * on contention (which we expect to be rare). Call only while * holding lock. * * @param array the heap array * @param oldCap the length of the array */ private void tryGrow(Object[] array, int oldCap) { lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; if (allocationSpinLock == 0 && ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) { try { int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } if (newArray == null) // back off if another thread is allocating Thread.yield(); lock.lock(); if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
解读:
此方法在offer方法中被调用。
此方法在扩容前释放锁。
问题:为什么在扩容前要先释放锁,然后使用 CAS 控制只有一个线程可以扩容成功?
扩容需要花费一定时间,如果在整个扩容期间一直持有锁,则扩容期间其他线程不能进行入队、出队操作,这降低了并发性。
扩容线程扩容时,其他线程原地自旋(会进入tryGrow方法,通过Thread.yield方法让出CPU,让扩容线程在扩容完毕后优先调用 lock.lock()重新获取锁,但这得不到保证),当扩容线程扩容完毕后才退出offer方法中的while循环。
扩容线程扩容完毕后会重置自旋锁变量 allocationSpinLock 为 0,这里没有使用 Unsafe 的 CAS 进行设置是因为某个时间只有一个线程获取到该锁,并且 allocationSpinLock 被修饰为 volatile。
扩容线程扩容完毕后在获取锁后复制当前 queue 里面的元素到新数组。
小结: