PriorityBlockingQueue是BlockingQueue(关于BlockingQueue的介绍,请戳《JUC之BlockingQueue初识》)的一种实现。对于一般的Queue,具有先进先出(FIFO)的特点,即先插入到队列中的元素要先被移出。但PriorityBlockingQueue打破了这个规则,即使插入的顺序不同,但元素值较小(使用一种比较规则进行大小的比较)的元素将优先出队,元素值越小将越优先出队,与元素的插入顺序无关。从这样的能力来看的话,值小的元素会获得出队的优先权,所以PriorityBlockingQueue是具有设置优先级的阻塞队列。其中Priority的意思就是优先。
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>(); queue.add(1); queue.add(5); queue.add(3); queue.add(7); queue.add(9); queue.add(6); queue.add(2); queue.add(10); System.out.print("队列中元素的顺序:"); Iterator<Integer> iterator = queue.iterator(); while (iterator.hasNext()) { System.out.print(iterator.next()+"\t"); } System.out.println(); System.out.print("元素出队的顺序:"); for (int i = 0, size=queue.size(); i < size; i++) { System.out.print(queue.poll()+"\t"); } System.out.println();
从上图的输出结果可以发现:
(1)元素的存放顺序与实际插入的顺序不同,可见在入队的时候对元素的顺序进行了调整。
(2)元素的出队顺序与元素的存放顺序不同,可见在出队的时候也对元素的顺序进行了调整。
private static final int DEFAULT_INITIAL_CAPACITY = 11; private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; private transient Object[] queue; private transient int size; private transient Comparator<? super E> comparator; private final ReentrantLock lock; private final Condition notEmpty;
PriorityBlockingQueue是基于数组实现的队列。queue用于存放被添加到队列中的元素。
队列的默认初始化容量大小,也就是数组queue的默认初始化长度。
队列的最大容量大小,也就是数组queue的最大长度。
队列中元素的个数,也就是queue中数据的个数。
一种比较规则,用于对队列中的元素进行比较。
使用ReentrantLock(关于ReentrantLock的介绍,请戳《JUC之ReentrantLock》)保证线程安全,以及Condition来实现线程间的通信。
// 不指定队列初始化容量,则使用默认的初始化容量,11 // 不指定排序规则 public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); }
// 指定初始化容量大小,不指定排序规则 public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); }
// 指定初始化容量大小和排序规则 public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { // 指定的初始化容量大小必须大于0,最小为1 if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; // 初始化数组 this.queue = new Object[initialCapacity]; }
PriorityBlockingQueue实现了BlockingQueue接口,所以在实现方法中要遵循接口中定义和约束的行为,具体的每个方法的约束行为和不同在这里就不介绍了,感兴趣的同学请戳《JUC之BlockingQueue初识》。我们在这里主要看一下PriorityBlockingQueue的优先级排序功能的怎么实现的。
1、向队列中添加元素
add、put等方法借助了offer,因此从offer开始。
public boolean offer(E e) { // 队列是不允许添加null元素 if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
(1)cap = (array = queue).length
将队列的底层存放元素的queue数组赋值给array,此时array的内容就是队列中元素的内容。cap的值就是数组的长度,即队列的容量大小。
(2)n = size
size是队列中元素的个数,n也就是队列中元素的个数。
(3)(n = size) >= (cap = (array = queue).length)
队列中元素的个数>=队列的容量,即队列已经满了。对于一般的阻塞队列,队列满了是无法进行扩容的,而PriorityBlockingQueue的可以扩容的。扩容就是对数组Queue进行扩大长度。
(4)tryGrow
关键点在于使用System.arraycopy对数组进行扩容。
private void tryGrow(Object[] array, int oldCap) { lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } if (newArray == null) // back off if another thread is allocating Thread.yield(); lock.lock(); if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
(5)siftUpComparable
没有指定具体的比较规则时,调用siftUpComparable进行元素的插入。
// k为队列中元素的个数 // x为正在添加到队列中的元素 // array为队列的底层实现queue数组 private static <T> void siftUpComparable(int k, T x, Object[] array) { // 将正在添加到队列中的元素强转成Comparable Comparable<? super T> key = (Comparable<? super T>) x; // k>0,即表示队列中已经存在元素了 while (k > 0) { // 取中间位置的元素下标 int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; }
①Comparable<? super T> key = (Comparable<? super T>) x;
将正在添加到队列中的元素强转成Comparable类型,可见如果没有自定义指定比较器,添加到PriorityBlockingQueue队列中的元素必须是Comparable接口的子类,否则强制转换的时候会出现类型转换异常。
像String、Integer、Long、Float、Double这些类都是实现了Comparable接口。
public class Thing { } public static void main(String[] args) { PriorityBlockingQueue<Thing> queue = new PriorityBlockingQueue(); // Thing没有实现Comparable接口,会抛出java.lang.ClassCastException异常 queue.add(new Thing()); }
②int parent = (k - 1) >>> 1;
>>>是对数字进行取半。k又是队列中元素的个数,这里就是取中间元素的下标。即如果队列中元素的个数为10,则parent为4。(k-1是因为数组的下标是从0开始的,这样计算出来的结果就是中间元素的下标了)
③Object e = array[parent];
e就是队列中元素的中间元素。
④key.compareTo((T) e) >= 0
key经过比较规则比较后不比中间元素小,即优先级并不比中间元素要高。如果满足,则跳出循环。如果不满足,即将中间的值移到数组最后元素的位置,然后再取0到parent一半的位置进行比较,如此往复。
调整规则:
将正在插入的元素A与队列的中间元素M1进行比较,如果A不比M1大则不再进行比较,直接将A插入到数组的最后元素的位置。如果A比M1小,则将M1调整到数组的最后元素的位置,然后再将A与左边剩下一半的数组取中位置的值进行比较,如此反复。
(6)siftUpUsingComparator
使用指定的排序规则。此时添加到队列中的元素类型就没有限制必须实现Comparable接口了。
private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; }
public static void main(String[] args) { PriorityBlockingQueue<Thing> queue = new PriorityBlockingQueue(10, new Comparator() { @Override public int compare(Object o1, Object o2) { return o1.hashCode()-o2.hashCode(); } }); // 可以正常添加到队列中而不会报错。 queue.add(new Thing()); } public class Thing { }
2、从队列中移出元素
poll、take等出队的方法都调用了dequeue方法。
private E dequeue() { int n = size - 1; // n<0表示队列为空,直接返回null if (n < 0) return null; else { Object[] array = queue; // 队列中队首的第一个元素 E result = (E) array[0]; // 队列中的最后一个元素,即队尾第一个元素。 E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
(1)siftDownComparable
k:0
x:队列中的第一个元素
array:队列中存放元素的数组
n:队列中最后一个元素的下标
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; // loop while a non-leaf while (k < half) { int child = (k << 1) + 1; // assume left child is least Object c = array[child]; int right = child + 1; if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } }
(2)siftDownUsingComparator
private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) { if (n > 0) { int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < n && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } }